diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 3cc691be09..834c6f4ab8 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -442,7 +442,11 @@ reredirectTask: if isActive { syncMatched, err = syncMatchQueue.TrySyncMatch(ctx, syncMatchTask) if syncMatched && !pm.shouldBacklogSyncMatchTaskOnError(err) { - pm.processTaskAddHooks(ctx, targetVersion, syncMatched) + // Only fire hooks for non-forwarded tasks. Forwarded tasks already had hooks fired + // on the child partition that originally received the task. + if params.forwardInfo == nil { + pm.processTaskAddHooks(ctx, targetVersion, syncMatched) + } // Build ID is not returned for sync match. The returned build ID is used by History to update // mutable state (and visibility) when the first workflow task is spooled. diff --git a/service/matching/task_queue_partition_manager_test.go b/service/matching/task_queue_partition_manager_test.go index 780f50fbc7..ec69580fd1 100644 --- a/service/matching/task_queue_partition_manager_test.go +++ b/service/matching/task_queue_partition_manager_test.go @@ -1637,6 +1637,96 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_AddHookNoSyncMatch() { s.False(calls[0].IsSyncMatch) } +func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedSyncMatch_HooksNotInvoked() { + // When a task is forwarded from a child partition and sync-matched on the parent, + // hooks should not fire on the parent because the child already fired them. + hook := &capturingTaskMatchHook{} + pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook}) + defer cleanup() + + type pollResult struct { + task *internalTask + err error + } + pollDone := make(chan pollResult, 1) + + // Start a poller in a background goroutine so there's someone to sync-match with. + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + task, _, err := pm.PollTask(ctx, &pollMetadata{ + workerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: "", + UseVersioning: false, + }, + }) + pollDone <- pollResult{task: task, err: err} + if task != nil && task.responseC != nil { + close(task.responseC) + } + }() + + // Wait until the poller is actually blocked in the matcher, ready to receive a task. + // This guarantees the subsequent AddTask will sync-match rather than spool. + pq := pm.defaultQueue().(*physicalTaskQueueManagerImpl) + s.Require().Eventually(pq.matcher.HasWaitingPoller, 2*time.Second, time.Millisecond) + + // Add a forwarded task (simulating a child partition forwarding to this parent). + // With a poller waiting, this should sync-match successfully. + // forwardInfo being set is what marks this task as forwarded from another partition. + _, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{ + taskInfo: &persistencespb.TaskInfo{ + NamespaceId: namespaceID, + RunId: "run", + WorkflowId: "wf", + }, + forwardInfo: &taskqueuespb.TaskForwardInfo{SourcePartition: "child-partition"}, + }) + s.Require().NoError(err) + s.Require().True(syncMatched) + + // Drain the poller goroutine and verify it received the task. + var pr pollResult + s.Require().Eventually(func() bool { + select { + case pr = <-pollDone: + return true + default: + return false + } + }, 2*time.Second, 10*time.Millisecond) + s.Require().NoError(pr.err) + s.Require().NotNil(pr.task) + + // Hooks should NOT have been called on the parent — the child partition that + // originated the forwarded task is responsible for firing hooks. + s.Require().Empty(hook.getCalls()) +} + +func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedNoSyncMatch_HooksNotInvoked() { + // When a forwarded task fails to sync-match (no poller available), hooks should + // not fire on the parent. The errRemoteSyncMatchFailed return path already skips + // hooks since it exits AddTask before reaching processTaskAddHooks. + hook := &capturingTaskMatchHook{} + pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook}) + defer cleanup() + + // Add a forwarded task with no poller waiting — sync-match will fail. + _, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{ + taskInfo: &persistencespb.TaskInfo{ + NamespaceId: namespaceID, + RunId: "run", + WorkflowId: "wf", + }, + forwardInfo: &taskqueuespb.TaskForwardInfo{SourcePartition: "child-partition"}, + }) + s.Require().Equal(errRemoteSyncMatchFailed, err) + s.Require().False(syncMatched) + + // Hooks should NOT have been called on the parent partition. + s.Require().Empty(hook.getCalls()) +} + func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() { hook1 := &capturingTaskMatchHook{} hook2 := &capturingTaskMatchHook{}