Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,11 @@ reredirectTask:
if isActive {
syncMatched, err = syncMatchQueue.TrySyncMatch(ctx, syncMatchTask)
if syncMatched && !pm.shouldBacklogSyncMatchTaskOnError(err) {
pm.processTaskAddHooks(ctx, targetVersion, syncMatched)
// Only fire hooks for non-forwarded tasks. Forwarded tasks already had hooks fired
// on the child partition that originally received the task.
if params.forwardInfo == nil {
pm.processTaskAddHooks(ctx, targetVersion, syncMatched)
}

// Build ID is not returned for sync match. The returned build ID is used by History to update
// mutable state (and visibility) when the first workflow task is spooled.
Expand Down
90 changes: 90 additions & 0 deletions service/matching/task_queue_partition_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,96 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_AddHookNoSyncMatch() {
s.False(calls[0].IsSyncMatch)
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedSyncMatch_HooksNotInvoked() {
// When a task is forwarded from a child partition and sync-matched on the parent,
// hooks should not fire on the parent because the child already fired them.
hook := &capturingTaskMatchHook{}
pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook})
defer cleanup()

type pollResult struct {
task *internalTask
err error
}
pollDone := make(chan pollResult, 1)

// Start a poller in a background goroutine so there's someone to sync-match with.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
task, _, err := pm.PollTask(ctx, &pollMetadata{
workerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: "",
UseVersioning: false,
},
})
pollDone <- pollResult{task: task, err: err}
if task != nil && task.responseC != nil {
close(task.responseC)
}
}()

// Wait until the poller is actually blocked in the matcher, ready to receive a task.
// This guarantees the subsequent AddTask will sync-match rather than spool.
pq := pm.defaultQueue().(*physicalTaskQueueManagerImpl)
s.Require().Eventually(pq.matcher.HasWaitingPoller, 2*time.Second, time.Millisecond)

// Add a forwarded task (simulating a child partition forwarding to this parent).
// With a poller waiting, this should sync-match successfully.
// forwardInfo being set is what marks this task as forwarded from another partition.
_, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{
taskInfo: &persistencespb.TaskInfo{
NamespaceId: namespaceID,
RunId: "run",
WorkflowId: "wf",
},
forwardInfo: &taskqueuespb.TaskForwardInfo{SourcePartition: "child-partition"},
})
s.Require().NoError(err)
s.Require().True(syncMatched)

// Drain the poller goroutine and verify it received the task.
var pr pollResult
s.Require().Eventually(func() bool {
select {
case pr = <-pollDone:
return true
default:
return false
}
}, 2*time.Second, 10*time.Millisecond)
s.Require().NoError(pr.err)
s.Require().NotNil(pr.task)

// Hooks should NOT have been called on the parent — the child partition that
// originated the forwarded task is responsible for firing hooks.
s.Require().Empty(hook.getCalls())
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedNoSyncMatch_HooksNotInvoked() {
// When a forwarded task fails to sync-match (no poller available), hooks should
// not fire on the parent. The errRemoteSyncMatchFailed return path already skips
// hooks since it exits AddTask before reaching processTaskAddHooks.
hook := &capturingTaskMatchHook{}
pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook})
defer cleanup()

// Add a forwarded task with no poller waiting — sync-match will fail.
_, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{
taskInfo: &persistencespb.TaskInfo{
NamespaceId: namespaceID,
RunId: "run",
WorkflowId: "wf",
},
forwardInfo: &taskqueuespb.TaskForwardInfo{SourcePartition: "child-partition"},
})
s.Require().Equal(errRemoteSyncMatchFailed, err)
s.Require().False(syncMatched)

// Hooks should NOT have been called on the parent partition.
s.Require().Empty(hook.getCalls())
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() {
hook1 := &capturingTaskMatchHook{}
hook2 := &capturingTaskMatchHook{}
Expand Down
Loading