Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions service/matching/hooks/task_lifecycle_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ import (
"go.temporal.io/server/common/tqid"
)

// NoMatchReason describes why a task was not sync-matched.
type NoMatchReason int

const (
// No specific reason provided.
NoMatchReasonUnspecified NoMatchReason = iota
// A poller was available but rate limiting blocked the match.
NoMatchReasonRateLimited
)

type (
// TaskQueuePartition is a simplified version of tqid.Partition that removes details
// the hooks should not concern themselves with
Expand All @@ -26,6 +36,7 @@ type (
TaskAddHookDetails struct {
DeploymentVersion *deploymentpb.WorkerDeploymentVersion
IsSyncMatch bool
NoMatchReason NoMatchReason
}

TaskHookFactory interface {
Expand Down
15 changes: 10 additions & 5 deletions service/matching/matcher_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ const (
NoMatchReasonBacklogged
// Sync match was attempted but no poller was available.
NoMatchReasonNoPoller
// A poller was available but rate limiting blocked the match.
NoMatchReasonRateLimited
)

// MatchTaskImmediately attempts a non-blocking sync match. Returns whether the task was matched
Expand All @@ -388,12 +390,15 @@ func (d *matcherData) MatchTaskImmediately(task *internalTask) syncMatchResult {

task.initMatch(d)
d.tasks.Add(task)
d.findAndWakeMatches()
rateLimited := d.findAndWakeMatches()
// don't wait, check if match() picked this one already
if task.matchResult != nil {
return syncMatchResult{matched: true}
}
d.tasks.Remove(task)
if rateLimited {
return syncMatchResult{reason: NoMatchReasonRateLimited}
}
return syncMatchResult{reason: NoMatchReasonNoPoller}
}

Expand Down Expand Up @@ -503,8 +508,8 @@ func (d *matcherData) allowForwarding() (allowForwarding bool) {
return delayToForwardingAllowed <= 0
}

// call with lock held
func (d *matcherData) findAndWakeMatches() {
// call with lock held. Returns true if a match was found but blocked by rate limiting.
func (d *matcherData) findAndWakeMatches() (rateLimited bool) {
allowForwarding := d.canForward && d.allowForwarding()

now := d.timeSource.Now().UnixNano()
Expand All @@ -516,14 +521,14 @@ func (d *matcherData) findAndWakeMatches() {
if task == nil || poller == nil {
// no more current matches, stop rate limit timer if was running
d.rateLimitTimer.unset()
return
return false
}

// check ready time
delay := d.rateLimitManager.readyTimeForTask(task).delay(now)
d.rateLimitTimer.set(d.timeSource, d.rematchAfterTimer, delay)
if delay > 0 {
return // not ready yet, timer will call match later
return true // not ready yet, timer will call match later
}

// ready to signal match
Expand Down
19 changes: 19 additions & 0 deletions service/matching/matcher_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,25 @@ func (s *MatcherDataSuite) TestMatchTaskImmediately() {
s.Equal(t, pres.task)
}

func (s *MatcherDataSuite) TestMatchTaskImmediatelyRateLimited() {
// Set rate limit to zero — blocks all matches.
s.md.rateLimitManager.SetEffectiveRPSAndSourceForTesting(0, enumspb.RATE_LIMIT_SOURCE_API)
s.md.rateLimitManager.UpdateSimpleRateLimitWithBurstForTesting(0)

// Add a waiting poller.
go func() {
poller := &waitingPoller{startTime: s.now()}
s.md.EnqueuePollerAndWait(nil, poller)
}()
s.waitForPollers(1)

// Sync match should fail due to rate limiting, not lack of poller.
t := s.newSyncTask(nil)
result := s.md.MatchTaskImmediately(t)
s.False(result.matched)
s.Equal(NoMatchReasonRateLimited, result.reason)
}

func (s *MatcherDataSuite) TestMatchTaskImmediatelyDisabledBacklog() {
// register some backlog with old tasks
s.md.EnqueueTaskNoWait(s.newBacklogTask(123, 10*time.Minute, nil))
Expand Down
4 changes: 2 additions & 2 deletions service/matching/physical_task_queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func TestReaderSignaling(t *testing.T) {
task := newInternalTaskForSyncMatch(&persistencespb.TaskInfo{
CreateTime: timestamp.TimePtr(time.Now().UTC()),
}, nil)
sync, err := s.tqMgr.TrySyncMatch(context.TODO(), task)
syncResult, err := s.tqMgr.TrySyncMatch(context.TODO(), task)
require.NoError(t, err)
require.True(t, sync)
require.True(t, syncResult.matched)
require.Len(t, readerNotifications, 0,
"Sync match should not signal taskReader")
}
Expand Down
21 changes: 17 additions & 4 deletions service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,18 @@ reredirectTask:
return "", false, err
}

var result syncMatchResult
if isActive {
var result syncMatchResult
result, err = syncMatchQueue.TrySyncMatch(ctx, syncMatchTask)
syncMatched = result.matched
// TODO: TrySyncMatch returns matched=true with a start error (e.g. busy workflow),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ShahabT @dnr See if this makes sense.

// but the task is not actually dispatched. It should return matched=false with an
// appropriate NoMatchReason so callers don't have to re-check errors.
if syncMatched && !pm.shouldBacklogSyncMatchTaskOnError(err) {
// Only fire hooks for non-forwarded tasks. Forwarded tasks already had hooks fired
// on the child partition that originally received the task.
if params.forwardInfo == nil {
pm.processTaskAddHooks(ctx, targetVersion, syncMatched)
pm.processTaskAddHooks(ctx, targetVersion, syncMatched, result.reason)
}

// Build ID is not returned for sync match. The returned build ID is used by History to update
Expand Down Expand Up @@ -476,17 +479,27 @@ reredirectTask:

err = spoolQueue.SpoolTask(params.taskInfo)
if err == nil {
pm.processTaskAddHooks(ctx, targetVersion, false)
pm.processTaskAddHooks(ctx, targetVersion, false, result.reason)
}

return assignedBuildId, false, err
}

func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, syncMatched bool) {
func noMatchReasonToHook(reason NoMatchReason) hooks.NoMatchReason {
switch reason {
case NoMatchReasonRateLimited:
return hooks.NoMatchReasonRateLimited
default:
return hooks.NoMatchReasonUnspecified
}
}

func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, syncMatched bool, reason NoMatchReason) {
for _, l := range pm.taskHooks {
l.ProcessTaskAdd(ctx, &hooks.TaskAddHookDetails{
DeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromVersion(targetVersion),
IsSyncMatch: syncMatched,
NoMatchReason: noMatchReasonToHook(reason),
})
}
}
Expand Down
71 changes: 71 additions & 0 deletions service/matching/task_queue_partition_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,7 @@ type capturedTaskMatchDetails struct {
TaskQueueName string
TaskQueueType enumspb.TaskQueueType
IsSyncMatch bool
NoMatchReason hooks.NoMatchReason
DeploymentVersion *deploymentpb.WorkerDeploymentVersion
}

Expand All @@ -1373,6 +1374,7 @@ func (h *capturingTaskMatchHook) ProcessTaskAdd(ctx context.Context, event *hook
TaskQueueName: h.taskQueueName,
TaskQueueType: h.taskQueueType,
IsSyncMatch: event.IsSyncMatch,
NoMatchReason: event.NoMatchReason,
}
if event.DeploymentVersion != nil {
details.DeploymentVersion = &deploymentpb.WorkerDeploymentVersion{
Expand Down Expand Up @@ -1727,6 +1729,75 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedNoSyncMatch_HooksN
s.Require().Empty(hook.getCalls())
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_RateLimited() {
if !s.newMatcher {
s.T().Skip("rate limiting signal from matcher is only available in new matcher")
}
hook := &capturingTaskMatchHook{}
pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook})
defer cleanup()

// Set rate limit to zero RPS — this blocks all sync matches due to rate limiting.
pm.rateLimitManager.SetEffectiveRPSAndSourceForTesting(0, enumspb.RATE_LIMIT_SOURCE_API)
pm.rateLimitManager.UpdateSimpleRateLimitWithBurstForTesting(0)

// Set up a waiting poller so sync match would succeed if not rate-limited.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
task, _, _ := pm.PollTask(ctx, &pollMetadata{
workerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: "",
UseVersioning: false,
},
})
if task != nil && task.responseC != nil {
close(task.responseC)
}
}()
pq := pm.defaultQueue().(*physicalTaskQueueManagerImpl)
s.Require().Eventually(pq.matcher.HasWaitingPoller, 2*time.Second, time.Millisecond)

// AddTask should fall through to spool because rate limiting blocked sync match.
_, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{
taskInfo: &persistencespb.TaskInfo{
NamespaceId: namespaceID,
RunId: "run",
WorkflowId: "wf",
},
})
s.Require().NoError(err)
s.Require().False(syncMatched)

calls := hook.getCalls()
s.Require().Len(calls, 1)
s.False(calls[0].IsSyncMatch)
s.Equal(hooks.NoMatchReasonRateLimited, calls[0].NoMatchReason)
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_NotRateLimited() {
hook := &capturingTaskMatchHook{}
pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook})
defer cleanup()

// No rate limiting configured — task should spool normally without rate limit flag.
_, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{
taskInfo: &persistencespb.TaskInfo{
NamespaceId: namespaceID,
RunId: "run",
WorkflowId: "wf",
VersionDirective: worker_versioning.MakeBuildIdDirective("buildXYZ"),
},
})
s.Require().NoError(err)
s.Require().False(syncMatched)

calls := hook.getCalls()
s.Require().Len(calls, 1)
s.False(calls[0].IsSyncMatch)
s.Equal(hooks.NoMatchReasonUnspecified, calls[0].NoMatchReason)
}

func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() {
hook1 := &capturingTaskMatchHook{}
hook2 := &capturingTaskMatchHook{}
Expand Down
Loading