diff --git a/common/persistence/cassandra/cluster_metadata_store.go b/common/persistence/cassandra/cluster_metadata_store.go index 432fb448af..dca7e9cbb2 100644 --- a/common/persistence/cassandra/cluster_metadata_store.go +++ b/common/persistence/cassandra/cluster_metadata_store.go @@ -64,7 +64,7 @@ func (m *ClusterMetadataStore) ListClusterMetadata( ctx context.Context, request *p.InternalListClusterMetadataRequest, ) (*p.InternalListClusterMetadataResponse, error) { - query := m.session.Query(templateListClusterMetadata, constMetadataPartition).WithContext(ctx) + query := m.session.Query(templateListClusterMetadata, constMetadataPartition).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() response := &p.InternalListClusterMetadataResponse{} @@ -106,7 +106,7 @@ func (m *ClusterMetadataStore) GetClusterMetadata( var encoding string var version int64 - query := m.session.Query(templateGetClusterMetadata, constMetadataPartition, request.ClusterName).WithContext(ctx) + query := m.session.Query(templateGetClusterMetadata, constMetadataPartition, request.ClusterName).WithContext(ctx).Idempotent(true) err := query.Scan(&clusterMetadata, &encoding, &version) if err != nil { return nil, gocql.ConvertError("GetClusterMetadata", err) @@ -159,7 +159,7 @@ func (m *ClusterMetadataStore) DeleteClusterMetadata( ctx context.Context, request *p.InternalDeleteClusterMetadataRequest, ) error { - query := m.session.Query(templateDeleteClusterMetadata, constMetadataPartition, request.ClusterName).WithContext(ctx) + query := m.session.Query(templateDeleteClusterMetadata, constMetadataPartition, request.ClusterName).WithContext(ctx).Idempotent(true) if err := query.Exec(); err != nil { return gocql.ConvertError("DeleteClusterMetadata", err) } @@ -202,7 +202,7 @@ func (m *ClusterMetadataStore) GetClusterMembers( } queryString.WriteString(templateAllowFiltering) - query := m.session.Query(queryString.String(), operands...).WithContext(ctx) + query := m.session.Query(queryString.String(), operands...).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() @@ -257,7 +257,7 @@ func (m *ClusterMetadataStore) UpsertClusterMembership( request.SessionStart, time.Now().UTC(), int64(request.RecordExpiry.Seconds()), - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() if err != nil { diff --git a/common/persistence/cassandra/history_store.go b/common/persistence/cassandra/history_store.go index a2069e5a5f..fd7e209dec 100644 --- a/common/persistence/cassandra/history_store.go +++ b/common/persistence/cassandra/history_store.go @@ -77,7 +77,7 @@ func (h *HistoryStore) AppendHistoryNodes( node.TransactionID, node.Events.Data, node.Events.EncodingType.String(), - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) if err := query.Exec(); err != nil { return convertTimeoutError(gocql.ConvertError("AppendHistoryNodes", err)) } @@ -129,7 +129,7 @@ func (h *HistoryStore) DeleteHistoryNodes( branchID, nodeID, txnID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) if err := query.Exec(); err != nil { return gocql.ConvertError("DeleteHistoryNodes", err) } @@ -166,7 +166,7 @@ func (h *HistoryStore) ReadHistoryBranch( queryString = v2templateReadHistoryNode } - query := h.Session.Query(queryString, treeID, branchID, request.MinNodeID, request.MaxNodeID).WithContext(ctx) + query := h.Session.Query(queryString, treeID, branchID, request.MinNodeID, request.MaxNodeID).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() var pagingToken []byte @@ -255,7 +255,7 @@ func (h *HistoryStore) ForkHistoryBranch( if err != nil { return serviceerror.NewInternalf("ForkHistoryBranch - Gocql NewBranchID UUID cast failed. Error: %v", err) } - query := h.Session.Query(v2templateInsertTree, cqlTreeID, cqlNewBranchID, datablob.Data, datablob.EncodingType.String()).WithContext(ctx) + query := h.Session.Query(v2templateInsertTree, cqlTreeID, cqlNewBranchID, datablob.Data, datablob.EncodingType.String()).WithContext(ctx).Idempotent(true) err = query.Exec() if err != nil { return gocql.ConvertError("ForkHistoryBranch", err) @@ -303,7 +303,7 @@ func (h *HistoryStore) GetAllHistoryTreeBranches( request *p.GetAllHistoryTreeBranchesRequest, ) (*p.InternalGetAllHistoryTreeBranchesResponse, error) { - query := h.Session.Query(v2templateScanAllTreeBranches).WithContext(ctx) + query := h.Session.Query(v2templateScanAllTreeBranches).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() @@ -360,7 +360,7 @@ func (h *HistoryStore) GetHistoryTreeContainingBranch( if err != nil { return nil, serviceerror.NewInternalf("ReadHistoryBranch. Gocql TreeId UUID cast failed. Error: %v", err) } - query := h.Session.Query(v2templateReadAllBranches, treeID).WithContext(ctx) + query := h.Session.Query(v2templateReadAllBranches, treeID).WithContext(ctx).Idempotent(true) pageSize := 100 var pagingToken []byte diff --git a/common/persistence/cassandra/matching_task_store_queue.go b/common/persistence/cassandra/matching_task_store_queue.go index b23b540ae2..70605edf86 100644 --- a/common/persistence/cassandra/matching_task_store_queue.go +++ b/common/persistence/cassandra/matching_task_store_queue.go @@ -163,7 +163,7 @@ func (d *taskQueueStore) GetTaskQueue( request.TaskType, rowTypeTaskQueue, taskQueueTaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) var rangeID int64 var tlBytes []byte diff --git a/common/persistence/cassandra/matching_task_store_user_data.go b/common/persistence/cassandra/matching_task_store_user_data.go index 41c0f66590..6122328472 100644 --- a/common/persistence/cassandra/matching_task_store_user_data.go +++ b/common/persistence/cassandra/matching_task_store_user_data.go @@ -48,7 +48,7 @@ func (d *userDataStore) GetTaskQueueUserData( query := d.Session.Query(templateGetTaskQueueUserDataQuery, request.NamespaceID, request.TaskQueue, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) var version int64 var userDataBytes []byte var encoding string @@ -134,7 +134,7 @@ func (d *userDataStore) UpdateTaskQueueUserData( } func (d *userDataStore) ListTaskQueueUserDataEntries(ctx context.Context, request *p.ListTaskQueueUserDataEntriesRequest) (*p.InternalListTaskQueueUserDataEntriesResponse, error) { - query := d.Session.Query(templateListTaskQueueUserDataQuery, request.NamespaceID).WithContext(ctx) + query := d.Session.Query(templateListTaskQueueUserDataQuery, request.NamespaceID).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() response := &p.InternalListTaskQueueUserDataEntriesResponse{} @@ -172,7 +172,7 @@ func (d *userDataStore) ListTaskQueueUserDataEntries(ctx context.Context, reques } func (d *userDataStore) GetTaskQueuesByBuildId(ctx context.Context, request *p.GetTaskQueuesByBuildIdRequest) ([]string, error) { - query := d.Session.Query(templateListTaskQueueNamesByBuildIdQuery, request.NamespaceID, request.BuildID).WithContext(ctx) + query := d.Session.Query(templateListTaskQueueNamesByBuildIdQuery, request.NamespaceID, request.BuildID).WithContext(ctx).Idempotent(true) iter := query.PageSize(listTaskQueueNamesByBuildIdPageSize).Iter() var taskQueues []string @@ -207,7 +207,7 @@ func (d *userDataStore) GetTaskQueuesByBuildId(ctx context.Context, request *p.G func (d *userDataStore) CountTaskQueuesByBuildId(ctx context.Context, request *p.CountTaskQueuesByBuildIdRequest) (int, error) { var count int - query := d.Session.Query(templateCountTaskQueueByBuildIdQuery, request.NamespaceID, request.BuildID).WithContext(ctx) + query := d.Session.Query(templateCountTaskQueueByBuildIdQuery, request.NamespaceID, request.BuildID).WithContext(ctx).Idempotent(true) err := query.Scan(&count) return count, err } diff --git a/common/persistence/cassandra/matching_task_store_v1.go b/common/persistence/cassandra/matching_task_store_v1.go index 87b85649b8..1109c470f1 100644 --- a/common/persistence/cassandra/matching_task_store_v1.go +++ b/common/persistence/cassandra/matching_task_store_v1.go @@ -150,7 +150,7 @@ func (d *matchingTaskStoreV1) GetTasks( rowTypeTaskInSubqueue(request.Subqueue), request.InclusiveMinTaskID, request.ExclusiveMaxTaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() response := &p.InternalGetTasksResponse{} @@ -212,7 +212,7 @@ func (d *matchingTaskStoreV1) CompleteTasksLessThan( request.TaskType, rowTypeTaskInSubqueue(request.Subqueue), request.ExclusiveMaxTaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() if err != nil { return 0, gocql.ConvertError("CompleteTasksLessThan", err) diff --git a/common/persistence/cassandra/matching_task_store_v2.go b/common/persistence/cassandra/matching_task_store_v2.go index 79b57851ed..1bb0bf5f27 100644 --- a/common/persistence/cassandra/matching_task_store_v2.go +++ b/common/persistence/cassandra/matching_task_store_v2.go @@ -169,7 +169,7 @@ func (d *matchingTaskStoreV2) GetTasks( int64(math.MaxInt64), ) } - iter := query.WithContext(ctx).PageSize(request.PageSize).PageState(request.NextPageToken).Iter() + iter := query.WithContext(ctx).Idempotent(true).PageSize(request.PageSize).PageState(request.NextPageToken).Iter() response := &p.InternalGetTasksResponse{} task := make(map[string]any) @@ -236,7 +236,7 @@ func (d *matchingTaskStoreV2) CompleteTasksLessThan( rowType, request.ExclusiveMaxPass, request.ExclusiveMaxTaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() if err != nil { return 0, gocql.ConvertError("CompleteTasksLessThan", err) diff --git a/common/persistence/cassandra/metadata_store.go b/common/persistence/cassandra/metadata_store.go index cb84913caf..1deefd6702 100644 --- a/common/persistence/cassandra/metadata_store.go +++ b/common/persistence/cassandra/metadata_store.go @@ -154,7 +154,7 @@ func (m *MetadataStore) CreateNamespaceInV2Table( defer func() { _ = iter.Close() }() deleteOrphanNamespace := func() { // Delete namespace from `namespaces_by_id` - if errDelete := m.session.Query(templateDeleteNamespaceQuery, request.ID).WithContext(ctx).Exec(); errDelete != nil { + if errDelete := m.session.Query(templateDeleteNamespaceQuery, request.ID).WithContext(ctx).Idempotent(true).Exec(); errDelete != nil { m.logger.Warn("Unable to delete orphan namespace record. Error", tag.Error(errDelete)) } } @@ -251,7 +251,7 @@ func (m *MetadataStore) RenameNamespace( if updateErr := m.session.Query(templateUpdateNamespaceByIdQuery, request.Name, request.Id, - ).WithContext(ctx).Exec(); updateErr != nil { + ).WithContext(ctx).Idempotent(true).Exec(); updateErr != nil { return gocql.ConvertError("RenameNamespace", updateErr) } @@ -316,14 +316,14 @@ func (m *MetadataStore) GetNamespace( namespace := request.Name if len(request.ID) > 0 { - query = m.session.Query(templateGetNamespaceQuery, request.ID).WithContext(ctx) + query = m.session.Query(templateGetNamespaceQuery, request.ID).WithContext(ctx).Idempotent(true) err = query.Scan(&namespace) if err != nil { return nil, handleError(request.Name, request.ID, err) } } - query = m.session.Query(templateGetNamespaceByNameQueryV2, constNamespacePartition, namespace).WithContext(ctx) + query = m.session.Query(templateGetNamespaceByNameQueryV2, constNamespacePartition, namespace).WithContext(ctx).Idempotent(true) err = query.Scan( nil, nil, @@ -352,7 +352,7 @@ func (m *MetadataStore) ListNamespaces( ctx context.Context, request *p.InternalListNamespacesRequest, ) (*p.InternalListNamespacesResponse, error) { - query := m.session.Query(templateListNamespaceQueryV2, constNamespacePartition).WithContext(ctx) + query := m.session.Query(templateListNamespaceQueryV2, constNamespacePartition).WithContext(ctx).Idempotent(true) pageSize := request.PageSize nextPageToken := request.NextPageToken response := &p.InternalListNamespacesResponse{} @@ -418,7 +418,7 @@ func (m *MetadataStore) DeleteNamespace( request *p.DeleteNamespaceRequest, ) error { var name string - query := m.session.Query(templateGetNamespaceQuery, request.ID).WithContext(ctx) + query := m.session.Query(templateGetNamespaceQuery, request.ID).WithContext(ctx).Idempotent(true) err := query.Scan(&name) if err != nil { if gocql.IsNotFoundError(err) { @@ -439,7 +439,7 @@ func (m *MetadataStore) DeleteNamespaceByName( request *p.DeleteNamespaceByNameRequest, ) error { var ID []byte - query := m.session.Query(templateGetNamespaceByNameQueryV2, constNamespacePartition, request.Name).WithContext(ctx) + query := m.session.Query(templateGetNamespaceByNameQueryV2, constNamespacePartition, request.Name).WithContext(ctx).Idempotent(true) err := query.Scan(&ID, nil, nil, nil, nil, nil) if err != nil { if gocql.IsNotFoundError(err) { @@ -454,7 +454,7 @@ func (m *MetadataStore) GetMetadata( ctx context.Context, ) (*p.GetMetadataResponse, error) { var notificationVersion int64 - query := m.session.Query(templateGetMetadataQueryV2, constNamespacePartition, namespaceMetadataRecordName).WithContext(ctx) + query := m.session.Query(templateGetMetadataQueryV2, constNamespacePartition, namespaceMetadataRecordName).WithContext(ctx).Idempotent(true) err := query.Scan(¬ificationVersion) if err != nil { if gocql.IsNotFoundError(err) { @@ -486,12 +486,12 @@ func (m *MetadataStore) updateMetadataBatch( } func (m *MetadataStore) deleteNamespace(ctx context.Context, name string, ID []byte) error { - query := m.session.Query(templateDeleteNamespaceByNameQueryV2, constNamespacePartition, name).WithContext(ctx) + query := m.session.Query(templateDeleteNamespaceByNameQueryV2, constNamespacePartition, name).WithContext(ctx).Idempotent(true) if err := query.Exec(); err != nil { return gocql.ConvertError("DeleteNamespaceByName", err) } - query = m.session.Query(templateDeleteNamespaceQuery, ID).WithContext(ctx) + query = m.session.Query(templateDeleteNamespaceQuery, ID).WithContext(ctx).Idempotent(true) if err := query.Exec(); err != nil { return gocql.ConvertError("DeleteNamespace", err) } diff --git a/common/persistence/cassandra/mutable_state_store.go b/common/persistence/cassandra/mutable_state_store.go index 9b0fc62757..207be6cabc 100644 --- a/common/persistence/cassandra/mutable_state_store.go +++ b/common/persistence/cassandra/mutable_state_store.go @@ -504,7 +504,7 @@ func (d *MutableStateStore) GetWorkflowExecution( request.RunID, defaultVisibilityTimestamp, rowTypeExecutionTaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) result := make(map[string]any) if err := query.MapScan(result); err != nil { @@ -930,7 +930,7 @@ func (d *MutableStateStore) DeleteWorkflowExecution( request.RunID, defaultVisibilityTimestamp, rowTypeExecutionTaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("DeleteWorkflowExecution", err) @@ -967,7 +967,7 @@ func (d *MutableStateStore) GetCurrentExecution( d.getCurrentRecordRunID(request.ArchetypeID), defaultVisibilityTimestamp, rowTypeExecutionTaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) result := make(map[string]any) if err := query.MapScan(result); err != nil { @@ -1056,7 +1056,7 @@ func (d *MutableStateStore) ListConcreteExecutions( templateListWorkflowExecutionQuery, request.ShardID, rowTypeExecution, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.PageSize).PageState(request.PageToken).Iter() response := &p.InternalListConcreteExecutionsResponse{} diff --git a/common/persistence/cassandra/mutable_state_task_store.go b/common/persistence/cassandra/mutable_state_task_store.go index a07dc3ca60..0a98101719 100644 --- a/common/persistence/cassandra/mutable_state_task_store.go +++ b/common/persistence/cassandra/mutable_state_task_store.go @@ -296,7 +296,7 @@ func (d *MutableStateTaskStore) getTransferTasks( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() response := &p.InternalGetHistoryTasksResponse{} @@ -337,7 +337,7 @@ func (d *MutableStateTaskStore) completeTransferTask( rowTypeTransferRunID, defaultVisibilityTimestamp, request.TaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("CompleteTransferTask", err) @@ -356,7 +356,7 @@ func (d *MutableStateTaskStore) rangeCompleteTransferTasks( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("RangeCompleteTransferTask", err) @@ -377,7 +377,7 @@ func (d *MutableStateTaskStore) getTimerTasks( rowTypeTimerRunID, minTimestamp, maxTimestamp, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() response := &p.InternalGetHistoryTasksResponse{} @@ -421,7 +421,7 @@ func (d *MutableStateTaskStore) completeTimerTask( rowTypeTimerRunID, ts, request.TaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("CompleteTimerTask", err) @@ -441,7 +441,7 @@ func (d *MutableStateTaskStore) rangeCompleteTimerTasks( rowTypeTimerRunID, start, end, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("RangeCompleteTimerTask", err) @@ -462,7 +462,7 @@ func (d *MutableStateTaskStore) getReplicationTasks( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx).PageSize(request.BatchSize).PageState(request.NextPageToken) + ).WithContext(ctx).Idempotent(true).PageSize(request.BatchSize).PageState(request.NextPageToken) return d.populateGetReplicationTasksResponse(query, "GetReplicationTasks") } @@ -479,7 +479,7 @@ func (d *MutableStateTaskStore) completeReplicationTask( rowTypeReplicationRunID, defaultVisibilityTimestamp, request.TaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("CompleteReplicationTask", err) @@ -498,7 +498,7 @@ func (d *MutableStateTaskStore) rangeCompleteReplicationTasks( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("RangeCompleteReplicationTask", err) @@ -525,7 +525,7 @@ func (d *MutableStateTaskStore) PutReplicationTaskToDLQ( datablob.EncodingType.String(), defaultVisibilityTimestamp, task.GetTaskId(), - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err = query.Exec() if err != nil { @@ -549,7 +549,7 @@ func (d *MutableStateTaskStore) GetReplicationTasksFromDLQ( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx).PageSize(request.BatchSize).PageState(request.NextPageToken) + ).WithContext(ctx).Idempotent(true).PageSize(request.BatchSize).PageState(request.NextPageToken) return d.populateGetReplicationTasksResponse(query, "GetReplicationTasksFromDLQ") } @@ -567,7 +567,7 @@ func (d *MutableStateTaskStore) DeleteReplicationTaskFromDLQ( rowTypeDLQRunID, defaultVisibilityTimestamp, request.TaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("DeleteReplicationTaskFromDLQ", err) @@ -587,7 +587,7 @@ func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("RangeDeleteReplicationTaskFromDLQ", err) @@ -606,7 +606,7 @@ func (d *MutableStateTaskStore) IsReplicationDLQEmpty( rowTypeDLQRunID, defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) if err := query.Scan(nil); err != nil { if gocql.IsNotFoundError(err) { @@ -632,7 +632,7 @@ func (d *MutableStateTaskStore) getVisibilityTasks( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() response := &p.InternalGetHistoryTasksResponse{} @@ -673,7 +673,7 @@ func (d *MutableStateTaskStore) completeVisibilityTask( rowTypeVisibilityTaskRunID, defaultVisibilityTimestamp, request.TaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("CompleteVisibilityTask", err) @@ -692,7 +692,7 @@ func (d *MutableStateTaskStore) rangeCompleteVisibilityTasks( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("RangeCompleteVisibilityTask", err) @@ -760,7 +760,7 @@ func (d *MutableStateTaskStore) getHistoryImmedidateTasks( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() @@ -807,7 +807,7 @@ func (d *MutableStateTaskStore) getHistoryScheduledTasks( rowTypeHistoryTaskRunID, minTimestamp, maxTimestamp, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() @@ -855,7 +855,7 @@ func (d *MutableStateTaskStore) completeHistoryTask( rowTypeHistoryTaskRunID, ts, request.TaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) err := query.Exec() return gocql.ConvertError("CompleteHistoryTask", err) @@ -877,7 +877,7 @@ func (d *MutableStateTaskStore) rangeCompleteHistoryTasks( defaultVisibilityTimestamp, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) } else { minTimestamp := p.UnixMilliseconds(request.InclusiveMinTaskKey.FireTime) maxTimestamp := p.UnixMilliseconds(request.ExclusiveMaxTaskKey.FireTime) @@ -889,7 +889,7 @@ func (d *MutableStateTaskStore) rangeCompleteHistoryTasks( rowTypeHistoryTaskRunID, minTimestamp, maxTimestamp, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) } err := query.Exec() diff --git a/common/persistence/cassandra/nexus_endpoint_store.go b/common/persistence/cassandra/nexus_endpoint_store.go index 6698199d42..3210f2d773 100644 --- a/common/persistence/cassandra/nexus_endpoint_store.go +++ b/common/persistence/cassandra/nexus_endpoint_store.go @@ -167,7 +167,7 @@ func (s *NexusEndpointStore) GetNexusEndpoint( ctx context.Context, request *p.GetNexusEndpointRequest, ) (*p.InternalNexusEndpoint, error) { - query := s.session.Query(templateGetEndpointByIdQuery, rowTypeNexusEndpoint, request.ID).WithContext(ctx) + query := s.session.Query(templateGetEndpointByIdQuery, rowTypeNexusEndpoint, request.ID).WithContext(ctx).Idempotent(true) var data []byte var dataEncoding string @@ -198,7 +198,7 @@ func (s *NexusEndpointStore) ListNexusEndpoints( response := &p.InternalListNexusEndpointsResponse{} - query := s.session.Query(templateListEndpointsQuery, rowTypeNexusEndpoint).WithContext(ctx) + query := s.session.Query(templateListEndpointsQuery, rowTypeNexusEndpoint).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() endpoints, err := s.getEndpointList(iter) @@ -291,7 +291,7 @@ func (s *NexusEndpointStore) listFirstPageWithVersion( ) (*p.InternalListNexusEndpointsResponse, error) { response := &p.InternalListNexusEndpointsResponse{} - query := s.session.Query(templateListEndpointsFirstPageQuery).WithContext(ctx) + query := s.session.Query(templateListEndpointsFirstPageQuery).WithContext(ctx).Idempotent(true) iter := query.PageSize(request.PageSize + 1).PageState(nil).Iter() // Use PageSize+1 to account for partitionStatus row partitionStateRow := make(map[string]any) @@ -330,7 +330,7 @@ func (s *NexusEndpointStore) listFirstPageWithVersion( } func (s *NexusEndpointStore) getTableVersion(ctx context.Context) (int64, error) { - query := s.session.Query(templateGetTableVersion, rowTypePartitionStatus, tableVersionEndpointID).WithContext(ctx) + query := s.session.Query(templateGetTableVersion, rowTypePartitionStatus, tableVersionEndpointID).WithContext(ctx).Idempotent(true) var version int64 if err := query.Scan(&version); err != nil { diff --git a/common/persistence/cassandra/queue_store.go b/common/persistence/cassandra/queue_store.go index 0a85830eb1..933a24ebd6 100644 --- a/common/persistence/cassandra/queue_store.go +++ b/common/persistence/cassandra/queue_store.go @@ -111,7 +111,7 @@ func (q *QueueStore) getLastMessageID( queueType persistence.QueueType, ) (int64, error) { - query := q.session.Query(templateGetLastMessageIDQuery, queueType).WithContext(ctx) + query := q.session.Query(templateGetLastMessageIDQuery, queueType).WithContext(ctx).Idempotent(true) result := make(map[string]any) err := query.MapScan(result) if err != nil { @@ -133,7 +133,7 @@ func (q *QueueStore) ReadMessages( q.queueType, lastMessageID, maxCount, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) iter := query.Iter() @@ -165,7 +165,7 @@ func (q *QueueStore) ReadMessagesFromDLQ( q.getDLQTypeFromQueueType(), firstMessageID, lastMessageID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) iter := query.PageSize(pageSize).PageState(pageToken).Iter() var result []*persistence.QueueMessage @@ -192,7 +192,7 @@ func (q *QueueStore) DeleteMessagesBefore( messageID int64, ) error { - query := q.session.Query(templateDeleteMessagesBeforeQuery, q.queueType, messageID).WithContext(ctx) + query := q.session.Query(templateDeleteMessagesBeforeQuery, q.queueType, messageID).WithContext(ctx).Idempotent(true) if err := query.Exec(); err != nil { return gocql.ConvertError("DeleteMessagesBefore", err) } @@ -205,7 +205,7 @@ func (q *QueueStore) DeleteMessageFromDLQ( ) error { // Use negative queue type as the dlq type - query := q.session.Query(templateDeleteMessageQuery, q.getDLQTypeFromQueueType(), messageID).WithContext(ctx) + query := q.session.Query(templateDeleteMessageQuery, q.getDLQTypeFromQueueType(), messageID).WithContext(ctx).Idempotent(true) if err := query.Exec(); err != nil { return gocql.ConvertError("DeleteMessageFromDLQ", err) } @@ -220,7 +220,7 @@ func (q *QueueStore) RangeDeleteMessagesFromDLQ( ) error { // Use negative queue type as the dlq type - query := q.session.Query(templateDeleteMessagesQuery, q.getDLQTypeFromQueueType(), firstMessageID, lastMessageID).WithContext(ctx) + query := q.session.Query(templateDeleteMessagesQuery, q.getDLQTypeFromQueueType(), firstMessageID, lastMessageID).WithContext(ctx).Idempotent(true) if err := query.Exec(); err != nil { return gocql.ConvertError("RangeDeleteMessagesFromDLQ", err) } @@ -294,7 +294,7 @@ func (q *QueueStore) getQueueMetadata( queueType persistence.QueueType, ) (*persistence.InternalQueueMetadata, error) { - query := q.session.Query(templateGetQueueMetadataQuery, queueType).WithContext(ctx) + query := q.session.Query(templateGetQueueMetadataQuery, queueType).WithContext(ctx).Idempotent(true) message := make(map[string]any) err := query.MapScan(message) if err != nil { diff --git a/common/persistence/cassandra/queue_v2_store.go b/common/persistence/cassandra/queue_v2_store.go index c5f968fd44..75d89b3a88 100644 --- a/common/persistence/cassandra/queue_v2_store.go +++ b/common/persistence/cassandra/queue_v2_store.go @@ -160,7 +160,7 @@ func (s *queueV2Store) ReadMessages( 0, int(minMessageID), request.PageSize, - ).WithContext(ctx).Iter() + ).WithContext(ctx).Idempotent(true).Iter() var ( messages []persistence.QueueV2Message @@ -288,7 +288,7 @@ func (s *queueV2Store) RangeDeleteMessages( 0, // partition deleteRange.MinMessageID, deleteRange.MaxMessageID, - ).WithContext(ctx).Exec() + ).WithContext(ctx).Idempotent(true).Exec() if err != nil { return nil, gocql.ConvertError("QueueV2RangeDeleteMessages", err) } @@ -387,7 +387,7 @@ func GetQueue( version int64 ) - err := session.Query(TemplateGetQueueQuery, queueType, queueName).WithContext(ctx).Scan( + err := session.Query(TemplateGetQueueQuery, queueType, queueName).WithContext(ctx).Idempotent(true).Scan( &queueBytes, &queueEncodingStr, &version, @@ -454,7 +454,7 @@ func (s *queueV2Store) getMessageCountAndLastID( func (s *queueV2Store) getMaxMessageID(ctx context.Context, queueType persistence.QueueV2Type, queueName string) (int64, bool, error) { var maxMessageID int64 - err := s.session.Query(TemplateGetMaxMessageIDQuery, queueType, queueName, 0).WithContext(ctx).Scan(&maxMessageID) + err := s.session.Query(TemplateGetMaxMessageIDQuery, queueType, queueName, 0).WithContext(ctx).Idempotent(true).Scan(&maxMessageID) if err != nil { if gocql.IsNotFoundError(err) { return 0, false, nil @@ -474,7 +474,7 @@ func (s *queueV2Store) ListQueues( iter := s.session.Query( templateGetQueueNamesQuery, request.QueueType, - ).PageSize(request.PageSize).PageState(request.NextPageToken).WithContext(ctx).Iter() + ).PageSize(request.PageSize).PageState(request.NextPageToken).WithContext(ctx).Idempotent(true).Iter() var queues []persistence.QueueInfo for { diff --git a/common/persistence/cassandra/shard_store.go b/common/persistence/cassandra/shard_store.go index 8e5eca5944..410c075843 100644 --- a/common/persistence/cassandra/shard_store.go +++ b/common/persistence/cassandra/shard_store.go @@ -69,7 +69,7 @@ func (d *ShardStore) GetOrCreateShard( rowTypeShardRunID, defaultVisibilityTimestamp, rowTypeShardTaskID, - ).WithContext(ctx) + ).WithContext(ctx).Idempotent(true) var data []byte var encoding string