Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type (
AddActivityTaskCancelRequestedEvent(int64, int64, string) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)
AddActivityTaskCanceledEvent(int64, int64, int64, *commonpb.Payloads, string) (*historypb.HistoryEvent, error)
AddWorkerCommandsTasks(commands []*workerpb.WorkerCommand, controlQueue string) error
GenerateActivityCancelCommandsForClose() error
AddActivityTaskCompletedEvent(int64, int64, *workflowservice.RespondActivityTaskCompletedRequest) (*historypb.HistoryEvent, error)
AddActivityTaskFailedEvent(int64, int64, *failurepb.Failure, enumspb.RetryState, string, *commonpb.WorkerVersionStamp) (*historypb.HistoryEvent, error)
AddActivityTaskScheduledEvent(int64, *commandpb.ScheduleActivityTaskCommandAttributes, bool) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)
Expand Down
14 changes: 14 additions & 0 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions service/history/ndc/events_reapplier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent_Termination(
false,
nil,
).Return(nil, nil)
msCurrent.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)
events := []*historypb.HistoryEvent{
{EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED},
event,
Expand Down
2 changes: 2 additions & 0 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() {
false,
nil,
).Return(&historypb.HistoryEvent{}, nil)
mutableState.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)

err := s.workflowResetter.terminateWorkflow(mutableState, terminateReason)
s.NoError(err)
Expand Down Expand Up @@ -1255,6 +1256,7 @@ func (s *workflowResetterSuite) TestReapplyEvents() {
false,
event.Links,
).Return(&historypb.HistoryEvent{}, nil)
ms.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)
}
}
}
Expand Down
63 changes: 63 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"go.temporal.io/server/common/searchattribute/sadefs"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/common/tasktoken"
"go.temporal.io/server/common/util"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/components/callbacks"
Expand Down Expand Up @@ -4556,6 +4557,68 @@ func (ms *MutableStateImpl) AddWorkerCommandsTasks(commands []*workerpb.WorkerCo
return ms.taskGenerator.GenerateWorkerCommandsTasks(commands, controlQueue)
}

// GenerateActivityCancelCommandsForClose generates WorkerCommandsTasks to cancel all
// in-flight activities that have a worker control queue. Called when the workflow is being
// terminated (or otherwise forcefully closed) to proactively notify workers.
func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error {
serializer := tasktoken.NewSerializer()
wfKey := ms.GetWorkflowKey()
nsID := ms.GetNamespaceEntry().ID().String()

commandsByQueue := make(map[string][]*workerpb.WorkerCommand)
for _, ai := range ms.pendingActivityInfoIDs {
if ai.WorkerControlTaskQueue == "" {
continue
}
if ai.StartedClock == nil {
// StartedClock may be nil for activities started before this feature was deployed.
// Skip cancel command; the activity will time out normally.
ms.logger.Debug("Skipping worker cancel command: activity missing StartedClock (pre-deploy)",
tag.WorkflowNamespaceID(wfKey.NamespaceID),
tag.WorkflowID(wfKey.WorkflowID),
tag.WorkflowRunID(wfKey.RunID),
tag.WorkflowScheduledEventID(ai.ScheduledEventId),
)
continue
}

taskToken, err := serializer.Serialize(tasktoken.NewActivityTaskToken(
nsID,
wfKey.WorkflowID,
wfKey.RunID,
ai.ScheduledEventId,
ai.ActivityId,
ai.ActivityType.GetName(),
ai.Attempt,
ai.StartedClock,
ai.Version,
ai.StartVersion,
nil,
))
if err != nil {
return err
}

commandsByQueue[ai.WorkerControlTaskQueue] = append(
commandsByQueue[ai.WorkerControlTaskQueue],
&workerpb.WorkerCommand{
Type: &workerpb.WorkerCommand_CancelActivity{
CancelActivity: &workerpb.CancelActivityCommand{
TaskToken: taskToken,
},
},
},
)
}

for controlQueue, commands := range commandsByQueue {
if err := ms.AddWorkerCommandsTasks(commands, controlQueue); err != nil {
return err
}
}
return nil
}

func (ms *MutableStateImpl) ApplyActivityTaskCancelRequestedEvent(
event *historypb.HistoryEvent,
) error {
Expand Down
158 changes: 158 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7285,3 +7285,161 @@ func (s *mutableStateSuite) TestApplyWorkflowExecutionOptionsUpdatedEvent_TimeSk
})
}
}

func TestGenerateActivityCancelCommandsForClose(t *testing.T) {
t.Parallel()

startedClock := &clockspb.VectorClock{ShardId: 1, Clock: 100}

testCases := []struct {
name string
featureEnabled bool
activities map[int64]*persistencespb.ActivityInfo
expectedQueues map[string]int // controlQueue -> expected command count
expectedNoTasks bool
}{
{
name: "activities with control queue and started clock",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "control-queue-1",
StartedClock: startedClock,
Attempt: 1,
},
},
expectedQueues: map[string]int{"control-queue-1": 1},
},
{
name: "skips activities without control queue",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
StartedClock: startedClock,
Attempt: 1,
},
},
expectedNoTasks: true,
},
{
name: "skips activities without started clock",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "control-queue-1",
Attempt: 1,
},
},
expectedNoTasks: true,
},
{
name: "multiple activities batched by control queue",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "queue-A",
StartedClock: startedClock,
Attempt: 1,
},
2: {
ScheduledEventId: 2,
ActivityId: "act-2",
ActivityType: &commonpb.ActivityType{Name: "type2"},
WorkerControlTaskQueue: "queue-A",
StartedClock: startedClock,
Attempt: 1,
},
3: {
ScheduledEventId: 3,
ActivityId: "act-3",
ActivityType: &commonpb.ActivityType{Name: "type3"},
WorkerControlTaskQueue: "queue-B",
StartedClock: startedClock,
Attempt: 1,
},
},
expectedQueues: map[string]int{"queue-A": 2, "queue-B": 1},
},
{
name: "feature flag disabled - no tasks generated",
featureEnabled: false,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "control-queue-1",
StartedClock: startedClock,
Attempt: 1,
},
},
expectedNoTasks: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockEventsCache := events.NewMockCache(ctrl)
mockConfig := tests.NewDynamicConfig()
mockConfig.EnableCancelActivityWorkerCommand = dynamicconfig.GetBoolPropertyFn(tc.featureEnabled)

mockShard := shard.NewTestContext(
ctrl,
&persistencespb.ShardInfo{ShardId: 0, RangeId: 1},
mockConfig,
)
defer mockShard.StopForTest()
reg := hsm.NewRegistry()
require.NoError(t, RegisterStateMachine(reg))
require.NoError(t, callbacks.RegisterStateMachine(reg))
require.NoError(t, nexusoperations.RegisterStateMachines(reg))
mockShard.SetStateMachineRegistry(reg)
mockShard.SetEventsCacheForTesting(mockEventsCache)

namespaceEntry := tests.GlobalNamespaceEntry
mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(gomock.Any(), gomock.Any()).Return(cluster.TestCurrentClusterName).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes()

ms := NewMutableState(mockShard, mockEventsCache, log.NewTestLogger(), namespaceEntry, tests.WorkflowID, tests.RunID, time.Now().UTC())
ms.pendingActivityInfoIDs = tc.activities

err := ms.GenerateActivityCancelCommandsForClose()
require.NoError(t, err)

if tc.expectedNoTasks {
require.Empty(t, ms.InsertTasks[tasks.CategoryOutbound])
return
}

// Verify tasks were generated by checking outbound task messages
var workerCommandTasks []*tasks.WorkerCommandsTask
for _, task := range ms.InsertTasks[tasks.CategoryOutbound] {
if wct, ok := task.(*tasks.WorkerCommandsTask); ok {
workerCommandTasks = append(workerCommandTasks, wct)
}
}

// Verify each expected queue got the right number of commands
tasksByQueue := make(map[string]int)
for _, wct := range workerCommandTasks {
tasksByQueue[wct.Destination] = len(wct.Commands)
}
require.Equal(t, tc.expectedQueues, tasksByQueue)
})
}
}
13 changes: 11 additions & 2 deletions service/history/workflow/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func TimeoutWorkflow(
retryState,
continuedRunID,
)
return err
if err != nil {
return err
}

return mutableState.GenerateActivityCancelCommandsForClose()
}

// TerminateWorkflow will write a WorkflowExecutionTerminated event with a fresh
Expand Down Expand Up @@ -142,8 +146,13 @@ func TerminateWorkflow(
deleteAfterTerminate,
links,
)
if err != nil {
return err
}

return err
// Proactively cancel in-flight activities by dispatching worker commands to workers
// that support control queues, so they don't run uselessly after the workflow is closed.
return mutableState.GenerateActivityCancelCommandsForClose()
}

// FindAutoResetPoint returns the auto reset point
Expand Down
Loading
Loading