diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 3dd134c0f26..385cebf9ac6 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -713,9 +713,11 @@ func (a *Activity) RecordHeartbeat( if err != nil { return nil, err } + prevHeartbeat, _ := a.LastHeartbeat.TryGet(ctx) a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{ - RecordedTime: timestamppb.New(ctx.Now(a)), - Details: input.Request.GetHeartbeatRequest().GetDetails(), + RecordedTime: timestamppb.New(ctx.Now(a)), + Details: input.Request.GetHeartbeatRequest().GetDetails(), + TotalHeartbeatCount: prevHeartbeat.GetTotalHeartbeatCount() + 1, }) if heartbeatTimeout := a.GetHeartbeatTimeout().AsDuration(); heartbeatTimeout > 0 { ctx.AddTask( @@ -817,6 +819,7 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) *apiactivitypb. Header: requestData.GetHeader(), HeartbeatDetails: heartbeat.GetDetails(), HeartbeatTimeout: a.GetHeartbeatTimeout(), + TotalHeartbeatCount: heartbeat.GetTotalHeartbeatCount(), LastAttemptCompleteTime: attempt.GetCompleteTime(), LastFailure: attempt.GetLastFailureDetails().GetFailure(), LastHeartbeatTime: heartbeat.GetRecordedTime(), diff --git a/chasm/lib/activity/activity_test.go b/chasm/lib/activity/activity_test.go index b614c3eba7a..0f5f1c2cbfd 100644 --- a/chasm/lib/activity/activity_test.go +++ b/chasm/lib/activity/activity_test.go @@ -10,7 +10,7 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/chasm" - "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" + activitypb "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/namespace" serviceerrors "go.temporal.io/server/common/serviceerror" diff --git a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go index 3bc4c2d3aff..3e95ee84f59 100644 --- a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go @@ -558,9 +558,11 @@ type ActivityHeartbeatState struct { // Details provided in the last recorded activity heartbeat. Details *v1.Payloads `protobuf:"bytes,1,opt,name=details,proto3" json:"details,omitempty"` // Time the last heartbeat was recorded. - RecordedTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=recorded_time,json=recordedTime,proto3" json:"recorded_time,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + RecordedTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=recorded_time,json=recordedTime,proto3" json:"recorded_time,omitempty"` + // Total number of heartbeats recorded across all attempts of this activity, including retries. + TotalHeartbeatCount int64 `protobuf:"varint,3,opt,name=total_heartbeat_count,json=totalHeartbeatCount,proto3" json:"total_heartbeat_count,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ActivityHeartbeatState) Reset() { @@ -607,6 +609,13 @@ func (x *ActivityHeartbeatState) GetRecordedTime() *timestamppb.Timestamp { return nil } +func (x *ActivityHeartbeatState) GetTotalHeartbeatCount() int64 { + if x != nil { + return x.TotalHeartbeatCount + } + return 0 +} + type ActivityRequestData struct { state protoimpl.MessageState `protogen:"open.v1"` // Serialized activity input, passed as arguments to the activity function. @@ -938,10 +947,11 @@ const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawD "\x10start_request_id\x18\t \x01(\tR\x0estartRequestId\x1a\x80\x01\n" + "\x12LastFailureDetails\x12.\n" + "\x04time\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\x04time\x12:\n" + - "\afailure\x18\x02 \x01(\v2 .temporal.api.failure.v1.FailureR\afailure\"\x95\x01\n" + + "\afailure\x18\x02 \x01(\v2 .temporal.api.failure.v1.FailureR\afailure\"\xc9\x01\n" + "\x16ActivityHeartbeatState\x12:\n" + "\adetails\x18\x01 \x01(\v2 .temporal.api.common.v1.PayloadsR\adetails\x12?\n" + - "\rrecorded_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\frecordedTime\"\xcd\x01\n" + + "\rrecorded_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\frecordedTime\x122\n" + + "\x15total_heartbeat_count\x18\x03 \x01(\x03R\x13totalHeartbeatCount\"\xcd\x01\n" + "\x13ActivityRequestData\x126\n" + "\x05input\x18\x01 \x01(\v2 .temporal.api.common.v1.PayloadsR\x05input\x126\n" + "\x06header\x18\x02 \x01(\v2\x1e.temporal.api.common.v1.HeaderR\x06header\x12F\n" + diff --git a/chasm/lib/activity/proto/v1/activity_state.proto b/chasm/lib/activity/proto/v1/activity_state.proto index 7f3e8c06b34..931afb0b881 100644 --- a/chasm/lib/activity/proto/v1/activity_state.proto +++ b/chasm/lib/activity/proto/v1/activity_state.proto @@ -162,6 +162,8 @@ message ActivityHeartbeatState { temporal.api.common.v1.Payloads details = 1; // Time the last heartbeat was recorded. google.protobuf.Timestamp recorded_time = 2; + // Total number of heartbeats recorded across all attempts of this activity, including retries. + int64 total_heartbeat_count = 3; } message ActivityRequestData { diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 9e674fdf30c..9b2864c6cd2 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -1326,6 +1326,7 @@ func (s *standaloneActivityTestSuite) TestRequestCancel() { require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus(), "expected Canceled but is %s", info.GetStatus()) require.Equal(t, "Test Cancellation", info.GetCanceledReason()) + require.Equal(t, int64(1), info.GetTotalHeartbeatCount(), "total heartbeat count") require.Greater(t, info.GetExecutionDuration().AsDuration(), time.Duration(0)) require.NotNil(t, info.GetCloseTime()) protorequire.ProtoEqual(t, details, activityResp.GetOutcome().GetFailure().GetCanceledFailureInfo().GetDetails()) @@ -1411,6 +1412,7 @@ func (s *standaloneActivityTestSuite) TestRequestCancel() { require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus(), "expected Canceled but is %s", info.GetStatus()) require.Equal(t, "Test Cancellation", info.GetCanceledReason()) + require.Equal(t, int64(1), info.GetTotalHeartbeatCount(), "total heartbeat count") protorequire.ProtoEqual(t, details, activityResp.GetOutcome().GetFailure().GetCanceledFailureInfo().GetDetails()) require.Equal(t, identity, activityResp.GetOutcome().GetFailure().GetCanceledFailureInfo().GetIdentity()) }) @@ -1485,6 +1487,7 @@ func (s *standaloneActivityTestSuite) TestRequestCancel() { require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, info.GetRunState(), "expected CancelRequested but is %s", info.GetRunState()) require.Equal(t, "Test Cancellation", info.GetCanceledReason()) + require.Equal(t, int64(1), info.GetTotalHeartbeatCount(), "total heartbeat count") }) t.Run("DifferentRequestIDFails", func(t *testing.T) { @@ -4648,6 +4651,22 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() { // Verify: heartbeat details from first attempt are available protorequire.ProtoEqual(t, heartbeatDetails, pollResp2.HeartbeatDetails) + + _, err = s.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollResp2.TaskToken, + Details: heartbeatDetails, + }) + require.NoError(t, err) + + desc, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + IncludeOutcome: true, + }) + require.NoError(t, err) + + require.Equal(t, int64(2), desc.Info.GetTotalHeartbeatCount(), "total heartbeat count") }) t.Run("ActivityTimesOutWithoutHeartbeat", func(t *testing.T) { @@ -4793,6 +4812,7 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() { require.NoError(t, err) require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, pollResp.GetInfo().GetStatus(), "expected status=Completed but is %s", pollResp.GetInfo().GetStatus()) + require.Equal(t, int64(2), pollResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count") protorequire.ProtoEqual(t, defaultResult, pollResp.GetOutcome().GetResult()) }) @@ -4842,6 +4862,7 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() { require.NoError(t, err) require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, descResp.GetInfo().GetStatus(), "activity should still be running but is %s", descResp.GetInfo().GetStatus()) + require.Equal(t, int64(1), descResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count") // Complete the activity to confirm it's still operable. _, err = s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ @@ -4860,6 +4881,7 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() { require.NoError(t, err) require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, descResp.GetInfo().GetStatus(), "expected status=Completed but is %s", descResp.GetInfo().GetStatus()) + require.Equal(t, int64(1), descResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count") protorequire.ProtoEqual(t, defaultResult, descResp.GetOutcome().GetResult()) }) @@ -4918,8 +4940,47 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() { require.NoError(t, err) require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, pollResp.GetInfo().GetStatus(), "expected status=Completed but is %s", pollResp.GetInfo().GetStatus()) + require.Equal(t, int64(1), pollResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count") protorequire.ProtoEqual(t, defaultResult, pollResp.GetOutcome().GetResult()) }) + + t.Run("HeartbeatCountIncrementsPerHeartbeat", func(t *testing.T) { + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + ActivityType: s.tv.ActivityType(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(1 * time.Minute), + }) + require.NoError(t, err) + + pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + }) + require.NoError(t, err) + + const numHeartbeats = 5 + for i := range numHeartbeats { + _, err = s.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollTaskResp.TaskToken, + Details: heartbeatDetails, + }) + require.NoError(t, err) + + descResp, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(t, err) + require.Equal(t, int64(i+1), descResp.GetInfo().GetTotalHeartbeatCount(), "total heartbeat count after heartbeat %d", i+1) + } + }) } func (s *standaloneActivityTestSuite) TestStartDelay() {