From 7e66f5f679033dd8c14a28e823b90ae7090b5819 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 28 Apr 2026 18:37:40 -0700 Subject: [PATCH 1/9] Add NoMatchReasonRateLimited and plumb to hooks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add NoMatchReasonRateLimited to the NoMatchReason enum. When findAndWakeMatches detects rate limiting, MatchTaskImmediately returns this reason, which flows through Offer → TrySyncMatch → processTaskAddHooks. At the hooks boundary, internal NoMatchReason is translated to hooks.NoMatchReason (which only exposes Unspecified and RateLimited). Co-Authored-By: Claude Opus 4.6 --- .../matching/hooks/task_lifecycle_hooks.go | 11 +++ service/matching/matcher_data.go | 15 ++-- service/matching/matcher_data_test.go | 19 +++++ .../physical_task_queue_manager_test.go | 4 +- .../matching/task_queue_partition_manager.go | 21 ++++-- .../task_queue_partition_manager_test.go | 71 +++++++++++++++++++ 6 files changed, 130 insertions(+), 11 deletions(-) diff --git a/service/matching/hooks/task_lifecycle_hooks.go b/service/matching/hooks/task_lifecycle_hooks.go index 7f7d3b7a96f..06e55774e8a 100644 --- a/service/matching/hooks/task_lifecycle_hooks.go +++ b/service/matching/hooks/task_lifecycle_hooks.go @@ -9,6 +9,16 @@ import ( "go.temporal.io/server/common/tqid" ) +// NoMatchReason describes why a task was not sync-matched. +type NoMatchReason int + +const ( + // No specific reason provided. + NoMatchReasonUnspecified NoMatchReason = iota + // A poller was available but rate limiting blocked the match. + NoMatchReasonRateLimited +) + type ( // TaskQueuePartition is a simplified version of tqid.Partition that removes details // the hooks should not concern themselves with @@ -26,6 +36,7 @@ type ( TaskAddHookDetails struct { DeploymentVersion *deploymentpb.WorkerDeploymentVersion IsSyncMatch bool + NoMatchReason NoMatchReason } TaskHookFactory interface { diff --git a/service/matching/matcher_data.go b/service/matching/matcher_data.go index b05895b3c2f..03d347ac1cb 100644 --- a/service/matching/matcher_data.go +++ b/service/matching/matcher_data.go @@ -369,6 +369,8 @@ const ( NoMatchReasonBacklogged // Sync match was attempted but no poller was available. NoMatchReasonNoPoller + // A poller was available but rate limiting blocked the match. + NoMatchReasonRateLimited ) // MatchTaskImmediately attempts a non-blocking sync match. Returns whether the task was matched @@ -388,12 +390,15 @@ func (d *matcherData) MatchTaskImmediately(task *internalTask) syncMatchResult { task.initMatch(d) d.tasks.Add(task) - d.findAndWakeMatches() + rateLimited := d.findAndWakeMatches() // don't wait, check if match() picked this one already if task.matchResult != nil { return syncMatchResult{matched: true} } d.tasks.Remove(task) + if rateLimited { + return syncMatchResult{reason: NoMatchReasonRateLimited} + } return syncMatchResult{reason: NoMatchReasonNoPoller} } @@ -503,8 +508,8 @@ func (d *matcherData) allowForwarding() (allowForwarding bool) { return delayToForwardingAllowed <= 0 } -// call with lock held -func (d *matcherData) findAndWakeMatches() { +// call with lock held. Returns true if a match was found but blocked by rate limiting. +func (d *matcherData) findAndWakeMatches() (rateLimited bool) { allowForwarding := d.canForward && d.allowForwarding() now := d.timeSource.Now().UnixNano() @@ -516,14 +521,14 @@ func (d *matcherData) findAndWakeMatches() { if task == nil || poller == nil { // no more current matches, stop rate limit timer if was running d.rateLimitTimer.unset() - return + return false } // check ready time delay := d.rateLimitManager.readyTimeForTask(task).delay(now) d.rateLimitTimer.set(d.timeSource, d.rematchAfterTimer, delay) if delay > 0 { - return // not ready yet, timer will call match later + return true // not ready yet, timer will call match later } // ready to signal match diff --git a/service/matching/matcher_data_test.go b/service/matching/matcher_data_test.go index 959f82b2825..089c3a658da 100644 --- a/service/matching/matcher_data_test.go +++ b/service/matching/matcher_data_test.go @@ -214,6 +214,25 @@ func (s *MatcherDataSuite) TestMatchTaskImmediately() { s.Equal(t, pres.task) } +func (s *MatcherDataSuite) TestMatchTaskImmediatelyRateLimited() { + // Set rate limit to zero — blocks all matches. + s.md.rateLimitManager.SetEffectiveRPSAndSourceForTesting(0, enumspb.RATE_LIMIT_SOURCE_API) + s.md.rateLimitManager.UpdateSimpleRateLimitWithBurstForTesting(0) + + // Add a waiting poller. + go func() { + poller := &waitingPoller{startTime: s.now()} + s.md.EnqueuePollerAndWait(nil, poller) + }() + s.waitForPollers(1) + + // Sync match should fail due to rate limiting, not lack of poller. + t := s.newSyncTask(nil) + result := s.md.MatchTaskImmediately(t) + s.False(result.matched) + s.Equal(NoMatchReasonRateLimited, result.reason) +} + func (s *MatcherDataSuite) TestMatchTaskImmediatelyDisabledBacklog() { // register some backlog with old tasks s.md.EnqueueTaskNoWait(s.newBacklogTask(123, 10*time.Minute, nil)) diff --git a/service/matching/physical_task_queue_manager_test.go b/service/matching/physical_task_queue_manager_test.go index 37e0471b61e..ad638f52168 100644 --- a/service/matching/physical_task_queue_manager_test.go +++ b/service/matching/physical_task_queue_manager_test.go @@ -134,9 +134,9 @@ func TestReaderSignaling(t *testing.T) { task := newInternalTaskForSyncMatch(&persistencespb.TaskInfo{ CreateTime: timestamp.TimePtr(time.Now().UTC()), }, nil) - sync, err := s.tqMgr.TrySyncMatch(context.TODO(), task) + syncResult, err := s.tqMgr.TrySyncMatch(context.TODO(), task) require.NoError(t, err) - require.True(t, sync) + require.True(t, syncResult.matched) require.Len(t, readerNotifications, 0, "Sync match should not signal taskReader") } diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index e9d44f83936..401ec9d592c 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -439,15 +439,18 @@ reredirectTask: return "", false, err } + var result syncMatchResult if isActive { - var result syncMatchResult result, err = syncMatchQueue.TrySyncMatch(ctx, syncMatchTask) syncMatched = result.matched + // TODO: TrySyncMatch returns matched=true with a start error (e.g. busy workflow), + // but the task is not actually dispatched. It should return matched=false with an + // appropriate NoMatchReason so callers don't have to re-check errors. if syncMatched && !pm.shouldBacklogSyncMatchTaskOnError(err) { // Only fire hooks for non-forwarded tasks. Forwarded tasks already had hooks fired // on the child partition that originally received the task. if params.forwardInfo == nil { - pm.processTaskAddHooks(ctx, targetVersion, syncMatched) + pm.processTaskAddHooks(ctx, targetVersion, syncMatched, result.reason) } // Build ID is not returned for sync match. The returned build ID is used by History to update @@ -476,17 +479,27 @@ reredirectTask: err = spoolQueue.SpoolTask(params.taskInfo) if err == nil { - pm.processTaskAddHooks(ctx, targetVersion, false) + pm.processTaskAddHooks(ctx, targetVersion, false, result.reason) } return assignedBuildId, false, err } -func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, syncMatched bool) { +func noMatchReasonToHook(reason NoMatchReason) hooks.NoMatchReason { + switch reason { + case NoMatchReasonRateLimited: + return hooks.NoMatchReasonRateLimited + default: + return hooks.NoMatchReasonUnspecified + } +} + +func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, syncMatched bool, reason NoMatchReason) { for _, l := range pm.taskHooks { l.ProcessTaskAdd(ctx, &hooks.TaskAddHookDetails{ DeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromVersion(targetVersion), IsSyncMatch: syncMatched, + NoMatchReason: noMatchReasonToHook(reason), }) } } diff --git a/service/matching/task_queue_partition_manager_test.go b/service/matching/task_queue_partition_manager_test.go index ec69580fd1d..4472a832170 100644 --- a/service/matching/task_queue_partition_manager_test.go +++ b/service/matching/task_queue_partition_manager_test.go @@ -1351,6 +1351,7 @@ type capturedTaskMatchDetails struct { TaskQueueName string TaskQueueType enumspb.TaskQueueType IsSyncMatch bool + NoMatchReason hooks.NoMatchReason DeploymentVersion *deploymentpb.WorkerDeploymentVersion } @@ -1373,6 +1374,7 @@ func (h *capturingTaskMatchHook) ProcessTaskAdd(ctx context.Context, event *hook TaskQueueName: h.taskQueueName, TaskQueueType: h.taskQueueType, IsSyncMatch: event.IsSyncMatch, + NoMatchReason: event.NoMatchReason, } if event.DeploymentVersion != nil { details.DeploymentVersion = &deploymentpb.WorkerDeploymentVersion{ @@ -1727,6 +1729,75 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedNoSyncMatch_HooksN s.Require().Empty(hook.getCalls()) } +func (s *PartitionManagerTestSuite) TestTaskAddHooks_RateLimited() { + if !s.newMatcher { + s.T().Skip("rate limiting signal from matcher is only available in new matcher") + } + hook := &capturingTaskMatchHook{} + pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook}) + defer cleanup() + + // Set rate limit to zero RPS — this blocks all sync matches due to rate limiting. + pm.rateLimitManager.SetEffectiveRPSAndSourceForTesting(0, enumspb.RATE_LIMIT_SOURCE_API) + pm.rateLimitManager.UpdateSimpleRateLimitWithBurstForTesting(0) + + // Set up a waiting poller so sync match would succeed if not rate-limited. + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + task, _, _ := pm.PollTask(ctx, &pollMetadata{ + workerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: "", + UseVersioning: false, + }, + }) + if task != nil && task.responseC != nil { + close(task.responseC) + } + }() + pq := pm.defaultQueue().(*physicalTaskQueueManagerImpl) + s.Require().Eventually(pq.matcher.HasWaitingPoller, 2*time.Second, time.Millisecond) + + // AddTask should fall through to spool because rate limiting blocked sync match. + _, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{ + taskInfo: &persistencespb.TaskInfo{ + NamespaceId: namespaceID, + RunId: "run", + WorkflowId: "wf", + }, + }) + s.Require().NoError(err) + s.Require().False(syncMatched) + + calls := hook.getCalls() + s.Require().Len(calls, 1) + s.False(calls[0].IsSyncMatch) + s.Equal(hooks.NoMatchReasonRateLimited, calls[0].NoMatchReason) +} + +func (s *PartitionManagerTestSuite) TestTaskAddHooks_NotRateLimited() { + hook := &capturingTaskMatchHook{} + pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook}) + defer cleanup() + + // No rate limiting configured — task should spool normally without rate limit flag. + _, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{ + taskInfo: &persistencespb.TaskInfo{ + NamespaceId: namespaceID, + RunId: "run", + WorkflowId: "wf", + VersionDirective: worker_versioning.MakeBuildIdDirective("buildXYZ"), + }, + }) + s.Require().NoError(err) + s.Require().False(syncMatched) + + calls := hook.getCalls() + s.Require().Len(calls, 1) + s.False(calls[0].IsSyncMatch) + s.Equal(hooks.NoMatchReasonUnspecified, calls[0].NoMatchReason) +} + func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() { hook1 := &capturingTaskMatchHook{} hook2 := &capturingTaskMatchHook{} From ccb2508ecafdd5060a33e5d9e802ede4d4d9cdc3 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 29 Apr 2026 11:18:45 -0700 Subject: [PATCH 2/9] Replace IsSyncMatch bool with SyncMatchOutcome enum in hooks API Replace the IsSyncMatch bool and NoMatchReason enum with a single SyncMatchOutcome enum in the hooks API. The internal syncMatchOutcome is translated to the hooks-specific enum at the boundary. Co-Authored-By: Claude Opus 4.6 --- .../matching/hooks/task_lifecycle_hooks.go | 15 ++++++------ .../matching/task_queue_partition_manager.go | 17 ++++++------- .../task_queue_partition_manager_test.go | 24 ++++++++----------- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/service/matching/hooks/task_lifecycle_hooks.go b/service/matching/hooks/task_lifecycle_hooks.go index 06e55774e8a..e2dd74e44f2 100644 --- a/service/matching/hooks/task_lifecycle_hooks.go +++ b/service/matching/hooks/task_lifecycle_hooks.go @@ -9,14 +9,16 @@ import ( "go.temporal.io/server/common/tqid" ) -// NoMatchReason describes why a task was not sync-matched. -type NoMatchReason int +// SyncMatchOutcome describes the outcome of a sync match attempt from the hook's perspective. +type SyncMatchOutcome int const ( - // No specific reason provided. - NoMatchReasonUnspecified NoMatchReason = iota + // The outcome is not specified or not relevant to the hook. + SyncMatchOutcomeUnspecified SyncMatchOutcome = iota + // The task was sync-matched successfully. + SyncMatchOutcomeSuccess // A poller was available but rate limiting blocked the match. - NoMatchReasonRateLimited + SyncMatchOutcomeRateLimited ) type ( @@ -35,8 +37,7 @@ type ( } TaskAddHookDetails struct { DeploymentVersion *deploymentpb.WorkerDeploymentVersion - IsSyncMatch bool - NoMatchReason NoMatchReason + SyncMatchOutcome SyncMatchOutcome } TaskHookFactory interface { diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index e2b8112f0f4..66544280390 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -447,7 +447,7 @@ reredirectTask: // Only fire hooks for non-forwarded tasks. Forwarded tasks already had hooks fired // on the child partition that originally received the task. if params.forwardInfo == nil { - pm.processTaskAddHooks(ctx, targetVersion, syncMatched, outcome) + pm.processTaskAddHooks(ctx, targetVersion, outcome) } // Build ID is not returned for sync match. The returned build ID is used by History to update @@ -476,27 +476,28 @@ reredirectTask: err = spoolQueue.SpoolTask(params.taskInfo) if err == nil { - pm.processTaskAddHooks(ctx, targetVersion, false, outcome) + pm.processTaskAddHooks(ctx, targetVersion, outcome) } return assignedBuildId, false, err } -func noMatchReasonToHook(outcome syncMatchOutcome) hooks.NoMatchReason { +func syncMatchOutcomeToHook(outcome syncMatchOutcome) hooks.SyncMatchOutcome { switch outcome { + case syncMatchSuccess: + return hooks.SyncMatchOutcomeSuccess case syncMatchRateLimited: - return hooks.NoMatchReasonRateLimited + return hooks.SyncMatchOutcomeRateLimited default: - return hooks.NoMatchReasonUnspecified + return hooks.SyncMatchOutcomeUnspecified } } -func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, syncMatched bool, outcome syncMatchOutcome) { +func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, outcome syncMatchOutcome) { for _, l := range pm.taskHooks { l.ProcessTaskAdd(ctx, &hooks.TaskAddHookDetails{ DeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromVersion(targetVersion), - IsSyncMatch: syncMatched, - NoMatchReason: noMatchReasonToHook(outcome), + SyncMatchOutcome: syncMatchOutcomeToHook(outcome), }) } } diff --git a/service/matching/task_queue_partition_manager_test.go b/service/matching/task_queue_partition_manager_test.go index 4472a832170..8dff543cce3 100644 --- a/service/matching/task_queue_partition_manager_test.go +++ b/service/matching/task_queue_partition_manager_test.go @@ -1348,10 +1348,9 @@ type capturingTaskMatchHook struct { } type capturedTaskMatchDetails struct { - TaskQueueName string - TaskQueueType enumspb.TaskQueueType - IsSyncMatch bool - NoMatchReason hooks.NoMatchReason + TaskQueueName string + TaskQueueType enumspb.TaskQueueType + SyncMatchOutcome hooks.SyncMatchOutcome DeploymentVersion *deploymentpb.WorkerDeploymentVersion } @@ -1373,8 +1372,7 @@ func (h *capturingTaskMatchHook) ProcessTaskAdd(ctx context.Context, event *hook details := capturedTaskMatchDetails{ TaskQueueName: h.taskQueueName, TaskQueueType: h.taskQueueType, - IsSyncMatch: event.IsSyncMatch, - NoMatchReason: event.NoMatchReason, + SyncMatchOutcome: event.SyncMatchOutcome, } if event.DeploymentVersion != nil { details.DeploymentVersion = &deploymentpb.WorkerDeploymentVersion{ @@ -1612,7 +1610,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_AddHookSyncMatch() { s.Require().Len(calls, 1) s.Equal(taskQueueName, calls[0].TaskQueueName) s.Equal(enumspb.TASK_QUEUE_TYPE_WORKFLOW, calls[0].TaskQueueType) - s.True(calls[0].IsSyncMatch) + s.Equal(hooks.SyncMatchOutcomeSuccess, calls[0].SyncMatchOutcome) s.Nil(calls[0].DeploymentVersion) } @@ -1636,7 +1634,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_AddHookNoSyncMatch() { s.Require().Len(calls, 1) s.Equal(taskQueueName, calls[0].TaskQueueName) s.Equal(enumspb.TASK_QUEUE_TYPE_WORKFLOW, calls[0].TaskQueueType) - s.False(calls[0].IsSyncMatch) + s.Equal(hooks.SyncMatchOutcomeUnspecified, calls[0].SyncMatchOutcome) } func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedSyncMatch_HooksNotInvoked() { @@ -1771,8 +1769,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_RateLimited() { calls := hook.getCalls() s.Require().Len(calls, 1) - s.False(calls[0].IsSyncMatch) - s.Equal(hooks.NoMatchReasonRateLimited, calls[0].NoMatchReason) + s.Equal(hooks.SyncMatchOutcomeRateLimited, calls[0].SyncMatchOutcome) } func (s *PartitionManagerTestSuite) TestTaskAddHooks_NotRateLimited() { @@ -1794,8 +1791,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_NotRateLimited() { calls := hook.getCalls() s.Require().Len(calls, 1) - s.False(calls[0].IsSyncMatch) - s.Equal(hooks.NoMatchReasonUnspecified, calls[0].NoMatchReason) + s.Equal(hooks.SyncMatchOutcomeUnspecified, calls[0].SyncMatchOutcome) } func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() { @@ -1815,8 +1811,8 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() { s.Len(hook1.getCalls(), 1) s.Len(hook2.getCalls(), 1) - s.False(hook1.getCalls()[0].IsSyncMatch) - s.False(hook2.getCalls()[0].IsSyncMatch) + s.Equal(hooks.SyncMatchOutcomeUnspecified, hook1.getCalls()[0].SyncMatchOutcome) + s.Equal(hooks.SyncMatchOutcomeUnspecified, hook2.getCalls()[0].SyncMatchOutcome) } type mockUserDataManager struct { From f6045b648953a9b0580d7808e381570dbbbc0021 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 29 Apr 2026 11:20:00 -0700 Subject: [PATCH 3/9] Rename SyncMatchOutcomeUnspecified to SyncMatchOutcomeNotMatched Co-Authored-By: Claude Opus 4.6 --- service/matching/hooks/task_lifecycle_hooks.go | 5 +++-- service/matching/task_queue_partition_manager.go | 2 +- service/matching/task_queue_partition_manager_test.go | 8 ++++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/service/matching/hooks/task_lifecycle_hooks.go b/service/matching/hooks/task_lifecycle_hooks.go index e2dd74e44f2..679ecbbf529 100644 --- a/service/matching/hooks/task_lifecycle_hooks.go +++ b/service/matching/hooks/task_lifecycle_hooks.go @@ -13,8 +13,9 @@ import ( type SyncMatchOutcome int const ( - // The outcome is not specified or not relevant to the hook. - SyncMatchOutcomeUnspecified SyncMatchOutcome = iota + // The task was not sync-matched. This is the default for all non-match cases + // (e.g. no poller available, backlog too deep, etc.). + SyncMatchOutcomeNotMatched SyncMatchOutcome = iota // The task was sync-matched successfully. SyncMatchOutcomeSuccess // A poller was available but rate limiting blocked the match. diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 66544280390..156dba564f9 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -489,7 +489,7 @@ func syncMatchOutcomeToHook(outcome syncMatchOutcome) hooks.SyncMatchOutcome { case syncMatchRateLimited: return hooks.SyncMatchOutcomeRateLimited default: - return hooks.SyncMatchOutcomeUnspecified + return hooks.SyncMatchOutcomeNotMatched } } diff --git a/service/matching/task_queue_partition_manager_test.go b/service/matching/task_queue_partition_manager_test.go index 8dff543cce3..385da73898d 100644 --- a/service/matching/task_queue_partition_manager_test.go +++ b/service/matching/task_queue_partition_manager_test.go @@ -1634,7 +1634,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_AddHookNoSyncMatch() { s.Require().Len(calls, 1) s.Equal(taskQueueName, calls[0].TaskQueueName) s.Equal(enumspb.TASK_QUEUE_TYPE_WORKFLOW, calls[0].TaskQueueType) - s.Equal(hooks.SyncMatchOutcomeUnspecified, calls[0].SyncMatchOutcome) + s.Equal(hooks.SyncMatchOutcomeNotMatched, calls[0].SyncMatchOutcome) } func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedSyncMatch_HooksNotInvoked() { @@ -1791,7 +1791,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_NotRateLimited() { calls := hook.getCalls() s.Require().Len(calls, 1) - s.Equal(hooks.SyncMatchOutcomeUnspecified, calls[0].SyncMatchOutcome) + s.Equal(hooks.SyncMatchOutcomeNotMatched, calls[0].SyncMatchOutcome) } func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() { @@ -1811,8 +1811,8 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() { s.Len(hook1.getCalls(), 1) s.Len(hook2.getCalls(), 1) - s.Equal(hooks.SyncMatchOutcomeUnspecified, hook1.getCalls()[0].SyncMatchOutcome) - s.Equal(hooks.SyncMatchOutcomeUnspecified, hook2.getCalls()[0].SyncMatchOutcome) + s.Equal(hooks.SyncMatchOutcomeNotMatched, hook1.getCalls()[0].SyncMatchOutcome) + s.Equal(hooks.SyncMatchOutcomeNotMatched, hook2.getCalls()[0].SyncMatchOutcome) } type mockUserDataManager struct { From c9ea9d6aaea33c38d1b56225d62cb383730783f9 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 29 Apr 2026 11:20:35 -0700 Subject: [PATCH 4/9] Remove irrelevant examples from hook enum comment Co-Authored-By: Claude Opus 4.6 --- service/matching/hooks/task_lifecycle_hooks.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/service/matching/hooks/task_lifecycle_hooks.go b/service/matching/hooks/task_lifecycle_hooks.go index 679ecbbf529..b80b12eb5bf 100644 --- a/service/matching/hooks/task_lifecycle_hooks.go +++ b/service/matching/hooks/task_lifecycle_hooks.go @@ -13,8 +13,7 @@ import ( type SyncMatchOutcome int const ( - // The task was not sync-matched. This is the default for all non-match cases - // (e.g. no poller available, backlog too deep, etc.). + // The task was not sync-matched. SyncMatchOutcomeNotMatched SyncMatchOutcome = iota // The task was sync-matched successfully. SyncMatchOutcomeSuccess From d94276cf3b67d8f88b90a0f779010cb40b8ede8b Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 29 Apr 2026 11:22:53 -0700 Subject: [PATCH 5/9] Keep deprecated IsSyncMatch for backwards compatibility Retain the IsSyncMatch bool field in TaskAddHookDetails, derived from SyncMatchOutcome, so existing hook consumers are not broken. Co-Authored-By: Claude Opus 4.6 --- service/matching/hooks/task_lifecycle_hooks.go | 1 + service/matching/task_queue_partition_manager.go | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/service/matching/hooks/task_lifecycle_hooks.go b/service/matching/hooks/task_lifecycle_hooks.go index b80b12eb5bf..bf7c615df1f 100644 --- a/service/matching/hooks/task_lifecycle_hooks.go +++ b/service/matching/hooks/task_lifecycle_hooks.go @@ -37,6 +37,7 @@ type ( } TaskAddHookDetails struct { DeploymentVersion *deploymentpb.WorkerDeploymentVersion + IsSyncMatch bool // Deprecated: use SyncMatchOutcome instead. SyncMatchOutcome SyncMatchOutcome } diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 156dba564f9..444aa0e11a8 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -495,9 +495,11 @@ func syncMatchOutcomeToHook(outcome syncMatchOutcome) hooks.SyncMatchOutcome { func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, outcome syncMatchOutcome) { for _, l := range pm.taskHooks { + hookOutcome := syncMatchOutcomeToHook(outcome) l.ProcessTaskAdd(ctx, &hooks.TaskAddHookDetails{ DeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromVersion(targetVersion), - SyncMatchOutcome: syncMatchOutcomeToHook(outcome), + IsSyncMatch: hookOutcome == hooks.SyncMatchOutcomeSuccess, + SyncMatchOutcome: hookOutcome, }) } } From df5fcc6e20ac9335936d1b73ae47839de28ee8a3 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 29 Apr 2026 11:23:17 -0700 Subject: [PATCH 6/9] Add SyncMatchOutcomeUnspecified as zero value Ensures uninitialized SyncMatchOutcome fields are distinguishable from intentional NotMatched values. Co-Authored-By: Claude Opus 4.6 --- service/matching/hooks/task_lifecycle_hooks.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/service/matching/hooks/task_lifecycle_hooks.go b/service/matching/hooks/task_lifecycle_hooks.go index bf7c615df1f..a1391c38062 100644 --- a/service/matching/hooks/task_lifecycle_hooks.go +++ b/service/matching/hooks/task_lifecycle_hooks.go @@ -13,8 +13,10 @@ import ( type SyncMatchOutcome int const ( + // Default zero value; should not be used explicitly. + SyncMatchOutcomeUnspecified SyncMatchOutcome = iota // The task was not sync-matched. - SyncMatchOutcomeNotMatched SyncMatchOutcome = iota + SyncMatchOutcomeNotMatched // The task was sync-matched successfully. SyncMatchOutcomeSuccess // A poller was available but rate limiting blocked the match. From e57d86be8536a938a5b8a3878afd11d49ce6574c Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 29 Apr 2026 11:28:15 -0700 Subject: [PATCH 7/9] Clarify SyncMatchOutcomeNotMatched is a catch-all Co-Authored-By: Claude Opus 4.6 --- service/matching/hooks/task_lifecycle_hooks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/matching/hooks/task_lifecycle_hooks.go b/service/matching/hooks/task_lifecycle_hooks.go index a1391c38062..5ce110cd5fd 100644 --- a/service/matching/hooks/task_lifecycle_hooks.go +++ b/service/matching/hooks/task_lifecycle_hooks.go @@ -15,7 +15,7 @@ type SyncMatchOutcome int const ( // Default zero value; should not be used explicitly. SyncMatchOutcomeUnspecified SyncMatchOutcome = iota - // The task was not sync-matched. + // The task was not sync-matched. Catch-all for reasons not covered by more specific outcomes. SyncMatchOutcomeNotMatched // The task was sync-matched successfully. SyncMatchOutcomeSuccess From 5b8a4f70256551bebbd0552396d07b7579f11595 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 29 Apr 2026 11:28:55 -0700 Subject: [PATCH 8/9] Handle syncMatchUnspecified in hook translation Map syncMatchUnspecified to hooks.SyncMatchOutcomeUnspecified so the bug signal propagates rather than being silently mapped to NotMatched. Co-Authored-By: Claude Opus 4.6 --- service/matching/task_queue_partition_manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 444aa0e11a8..558f8334072 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -488,6 +488,8 @@ func syncMatchOutcomeToHook(outcome syncMatchOutcome) hooks.SyncMatchOutcome { return hooks.SyncMatchOutcomeSuccess case syncMatchRateLimited: return hooks.SyncMatchOutcomeRateLimited + case syncMatchUnspecified: + return hooks.SyncMatchOutcomeUnspecified default: return hooks.SyncMatchOutcomeNotMatched } From 82cf5976348bf427103e2531562a8e08ab53c312 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 29 Apr 2026 11:57:42 -0700 Subject: [PATCH 9/9] Fix struct field alignment formatting Co-Authored-By: Claude Opus 4.6 --- service/matching/task_queue_partition_manager_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/service/matching/task_queue_partition_manager_test.go b/service/matching/task_queue_partition_manager_test.go index 385da73898d..f7e415fe586 100644 --- a/service/matching/task_queue_partition_manager_test.go +++ b/service/matching/task_queue_partition_manager_test.go @@ -1348,9 +1348,9 @@ type capturingTaskMatchHook struct { } type capturedTaskMatchDetails struct { - TaskQueueName string - TaskQueueType enumspb.TaskQueueType - SyncMatchOutcome hooks.SyncMatchOutcome + TaskQueueName string + TaskQueueType enumspb.TaskQueueType + SyncMatchOutcome hooks.SyncMatchOutcome DeploymentVersion *deploymentpb.WorkerDeploymentVersion } @@ -1370,8 +1370,8 @@ func (h *capturingTaskMatchHook) ProcessTaskAdd(ctx context.Context, event *hook h.mu.Lock() defer h.mu.Unlock() details := capturedTaskMatchDetails{ - TaskQueueName: h.taskQueueName, - TaskQueueType: h.taskQueueType, + TaskQueueName: h.taskQueueName, + TaskQueueType: h.taskQueueType, SyncMatchOutcome: event.SyncMatchOutcome, } if event.DeploymentVersion != nil {