Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion service/matching/hooks/task_lifecycle_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ import (
"go.temporal.io/server/common/tqid"
)

// SyncMatchOutcome describes the outcome of a sync match attempt from the hook's perspective.
type SyncMatchOutcome int

const (
// Default zero value; should not be used explicitly.
SyncMatchOutcomeUnspecified SyncMatchOutcome = iota
// The task was not sync-matched. Catch-all for reasons not covered by more specific outcomes.
SyncMatchOutcomeNotMatched
// The task was sync-matched successfully.
SyncMatchOutcomeSuccess
// A poller was available but rate limiting blocked the match.
SyncMatchOutcomeRateLimited
)

type (
// TaskQueuePartition is a simplified version of tqid.Partition that removes details
// the hooks should not concern themselves with
Expand All @@ -25,7 +39,8 @@ type (
}
TaskAddHookDetails struct {
DeploymentVersion *deploymentpb.WorkerDeploymentVersion
IsSyncMatch bool
IsSyncMatch bool // Deprecated: use SyncMatchOutcome instead.
SyncMatchOutcome SyncMatchOutcome
}

TaskHookFactory interface {
Expand Down
15 changes: 10 additions & 5 deletions service/matching/matcher_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
syncMatchBacklogged
// Sync match was attempted but no poller was available.
syncMatchNoPoller
// A poller was available but rate limiting blocked the match.
syncMatchRateLimited
)

type taskForwarderType int32
Expand Down Expand Up @@ -389,12 +391,15 @@ func (d *matcherData) MatchTaskImmediately(task *internalTask) syncMatchOutcome

task.initMatch(d)
d.tasks.Add(task)
d.findAndWakeMatches()
rateLimited := d.findAndWakeMatches()
// don't wait, check if match() picked this one already
if task.matchResult != nil {
return syncMatchSuccess
}
d.tasks.Remove(task)
if rateLimited {
return syncMatchRateLimited
}
return syncMatchNoPoller
}

Expand Down Expand Up @@ -504,8 +509,8 @@ func (d *matcherData) allowForwarding() (allowForwarding bool) {
return delayToForwardingAllowed <= 0
}

// call with lock held
func (d *matcherData) findAndWakeMatches() {
// call with lock held. Returns true if a match was found but blocked by rate limiting.
func (d *matcherData) findAndWakeMatches() (rateLimited bool) {
allowForwarding := d.canForward && d.allowForwarding()

now := d.timeSource.Now().UnixNano()
Expand All @@ -517,14 +522,14 @@ func (d *matcherData) findAndWakeMatches() {
if task == nil || poller == nil {
// no more current matches, stop rate limit timer if was running
d.rateLimitTimer.unset()
return
return false
}

// check ready time
delay := d.rateLimitManager.readyTimeForTask(task).delay(now)
d.rateLimitTimer.set(d.timeSource, d.rematchAfterTimer, delay)
if delay > 0 {
return // not ready yet, timer will call match later
return true // not ready yet, timer will call match later
}

// ready to signal match
Expand Down
17 changes: 17 additions & 0 deletions service/matching/matcher_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ func (s *MatcherDataSuite) TestMatchTaskImmediately() {
s.Equal(t, pres.task)
}

func (s *MatcherDataSuite) TestMatchTaskImmediatelyRateLimited() {
// Set rate limit to zero — blocks all matches.
s.md.rateLimitManager.SetEffectiveRPSAndSourceForTesting(0, enumspb.RATE_LIMIT_SOURCE_API)
s.md.rateLimitManager.UpdateSimpleRateLimitWithBurstForTesting(0)

// Add a waiting poller.
go func() {
poller := &waitingPoller{startTime: s.now()}
s.md.EnqueuePollerAndWait(nil, poller)
}()
s.waitForPollers(1)

// Sync match should fail due to rate limiting, not lack of poller.
t := s.newSyncTask(nil)
s.Equal(syncMatchRateLimited, s.md.MatchTaskImmediately(t))
}

func (s *MatcherDataSuite) TestMatchTaskImmediatelyDisabledBacklog() {
// register some backlog with old tasks
s.md.EnqueueTaskNoWait(s.newBacklogTask(123, 10*time.Minute, nil))
Expand Down
4 changes: 2 additions & 2 deletions service/matching/physical_task_queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func TestReaderSignaling(t *testing.T) {
task := newInternalTaskForSyncMatch(&persistencespb.TaskInfo{
CreateTime: timestamp.TimePtr(time.Now().UTC()),
}, nil)
sync, err := s.tqMgr.TrySyncMatch(context.TODO(), task)
outcome, err := s.tqMgr.TrySyncMatch(context.TODO(), task)
require.NoError(t, err)
require.True(t, sync)
require.Equal(t, syncMatchSuccess, outcome)
require.Len(t, readerNotifications, 0,
"Sync match should not signal taskReader")
}
Expand Down
25 changes: 20 additions & 5 deletions service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,15 @@ reredirectTask:
return "", false, err
}

var outcome syncMatchOutcome
if isActive {
var outcome syncMatchOutcome
outcome, err = syncMatchQueue.TrySyncMatch(ctx, syncMatchTask)
syncMatched = outcome == syncMatchSuccess
if syncMatched && !pm.shouldBacklogSyncMatchTaskOnError(err) {
// Only fire hooks for non-forwarded tasks. Forwarded tasks already had hooks fired
// on the child partition that originally received the task.
if params.forwardInfo == nil {
pm.processTaskAddHooks(ctx, targetVersion, syncMatched)
pm.processTaskAddHooks(ctx, targetVersion, outcome)
}

// Build ID is not returned for sync match. The returned build ID is used by History to update
Expand Down Expand Up @@ -476,17 +476,32 @@ reredirectTask:

err = spoolQueue.SpoolTask(params.taskInfo)
if err == nil {
pm.processTaskAddHooks(ctx, targetVersion, false)
pm.processTaskAddHooks(ctx, targetVersion, outcome)
}

return assignedBuildId, false, err
}

func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, syncMatched bool) {
func syncMatchOutcomeToHook(outcome syncMatchOutcome) hooks.SyncMatchOutcome {
switch outcome {
case syncMatchSuccess:
return hooks.SyncMatchOutcomeSuccess
case syncMatchRateLimited:
return hooks.SyncMatchOutcomeRateLimited
case syncMatchUnspecified:
return hooks.SyncMatchOutcomeUnspecified
default:
return hooks.SyncMatchOutcomeNotMatched
}
}

func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, outcome syncMatchOutcome) {
for _, l := range pm.taskHooks {
hookOutcome := syncMatchOutcomeToHook(outcome)
l.ProcessTaskAdd(ctx, &hooks.TaskAddHookDetails{
DeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromVersion(targetVersion),
IsSyncMatch: syncMatched,
IsSyncMatch: hookOutcome == hooks.SyncMatchOutcomeSuccess,
SyncMatchOutcome: hookOutcome,
})
}
}
Expand Down
83 changes: 75 additions & 8 deletions service/matching/task_queue_partition_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ type capturingTaskMatchHook struct {
type capturedTaskMatchDetails struct {
TaskQueueName string
TaskQueueType enumspb.TaskQueueType
IsSyncMatch bool
SyncMatchOutcome hooks.SyncMatchOutcome
DeploymentVersion *deploymentpb.WorkerDeploymentVersion
}

Expand All @@ -1370,9 +1370,9 @@ func (h *capturingTaskMatchHook) ProcessTaskAdd(ctx context.Context, event *hook
h.mu.Lock()
defer h.mu.Unlock()
details := capturedTaskMatchDetails{
TaskQueueName: h.taskQueueName,
TaskQueueType: h.taskQueueType,
IsSyncMatch: event.IsSyncMatch,
TaskQueueName: h.taskQueueName,
TaskQueueType: h.taskQueueType,
SyncMatchOutcome: event.SyncMatchOutcome,
}
if event.DeploymentVersion != nil {
details.DeploymentVersion = &deploymentpb.WorkerDeploymentVersion{
Expand Down Expand Up @@ -1610,7 +1610,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_AddHookSyncMatch() {
s.Require().Len(calls, 1)
s.Equal(taskQueueName, calls[0].TaskQueueName)
s.Equal(enumspb.TASK_QUEUE_TYPE_WORKFLOW, calls[0].TaskQueueType)
s.True(calls[0].IsSyncMatch)
s.Equal(hooks.SyncMatchOutcomeSuccess, calls[0].SyncMatchOutcome)
s.Nil(calls[0].DeploymentVersion)
}

Expand All @@ -1634,7 +1634,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_AddHookNoSyncMatch() {
s.Require().Len(calls, 1)
s.Equal(taskQueueName, calls[0].TaskQueueName)
s.Equal(enumspb.TASK_QUEUE_TYPE_WORKFLOW, calls[0].TaskQueueType)
s.False(calls[0].IsSyncMatch)
s.Equal(hooks.SyncMatchOutcomeNotMatched, calls[0].SyncMatchOutcome)
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedSyncMatch_HooksNotInvoked() {
Expand Down Expand Up @@ -1727,6 +1727,73 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedNoSyncMatch_HooksN
s.Require().Empty(hook.getCalls())
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_RateLimited() {
if !s.newMatcher {
s.T().Skip("rate limiting signal from matcher is only available in new matcher")
}
hook := &capturingTaskMatchHook{}
pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook})
defer cleanup()

// Set rate limit to zero RPS — this blocks all sync matches due to rate limiting.
pm.rateLimitManager.SetEffectiveRPSAndSourceForTesting(0, enumspb.RATE_LIMIT_SOURCE_API)
pm.rateLimitManager.UpdateSimpleRateLimitWithBurstForTesting(0)

// Set up a waiting poller so sync match would succeed if not rate-limited.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
task, _, _ := pm.PollTask(ctx, &pollMetadata{
workerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: "",
UseVersioning: false,
},
})
if task != nil && task.responseC != nil {
close(task.responseC)
}
}()
pq := pm.defaultQueue().(*physicalTaskQueueManagerImpl)
s.Require().Eventually(pq.matcher.HasWaitingPoller, 2*time.Second, time.Millisecond)

// AddTask should fall through to spool because rate limiting blocked sync match.
_, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{
taskInfo: &persistencespb.TaskInfo{
NamespaceId: namespaceID,
RunId: "run",
WorkflowId: "wf",
},
})
s.Require().NoError(err)
s.Require().False(syncMatched)

calls := hook.getCalls()
s.Require().Len(calls, 1)
s.Equal(hooks.SyncMatchOutcomeRateLimited, calls[0].SyncMatchOutcome)
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_NotRateLimited() {
hook := &capturingTaskMatchHook{}
pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook})
defer cleanup()

// No rate limiting configured — task should spool normally without rate limit flag.
_, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{
taskInfo: &persistencespb.TaskInfo{
NamespaceId: namespaceID,
RunId: "run",
WorkflowId: "wf",
VersionDirective: worker_versioning.MakeBuildIdDirective("buildXYZ"),
},
})
s.Require().NoError(err)
s.Require().False(syncMatched)

calls := hook.getCalls()
s.Require().Len(calls, 1)
s.Equal(hooks.SyncMatchOutcomeNotMatched, calls[0].SyncMatchOutcome)
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() {
hook1 := &capturingTaskMatchHook{}
hook2 := &capturingTaskMatchHook{}
Expand All @@ -1744,8 +1811,8 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() {

s.Len(hook1.getCalls(), 1)
s.Len(hook2.getCalls(), 1)
s.False(hook1.getCalls()[0].IsSyncMatch)
s.False(hook2.getCalls()[0].IsSyncMatch)
s.Equal(hooks.SyncMatchOutcomeNotMatched, hook1.getCalls()[0].SyncMatchOutcome)
s.Equal(hooks.SyncMatchOutcomeNotMatched, hook2.getCalls()[0].SyncMatchOutcome)
}

type mockUserDataManager struct {
Expand Down
Loading