diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index df4dbc154c71c..ca4664ce216ec 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -209,11 +209,21 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT } zoneHelper := taskZoneInfoHelper{} zoneHelper.init(allTiFlashZoneInfo) + _, stmtDigest := c.sessionCtx.GetSessionVars().StmtCtx.SQLDigest() + sqlDigest := "" + if stmtDigest != nil { + sqlDigest = stmtDigest.String() + } + _, planDigest := c.sessionCtx.GetSessionVars().StmtCtx.GetPlanDigest() + planDigestStr := "" + if planDigest != nil { + planDigestStr = planDigest.String() + } for _, mppTask := range pf.Sink.GetSelfTasks() { if mppTask.PartitionTableIDs != nil { err = util.UpdateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs) } else if !mppTask.TiFlashStaticPrune { - // If isDisaggregatedTiFlashStaticPrune is true, it means this TableScan is under PartitionUnoin, + // If isDisaggregatedTiFlashStaticPrune is true, it means this TableScan is under PartitionUnion, // tableID in TableScan is already the physical table id of this partition, no need to update again. err = util.UpdateExecutorTableID(context.Background(), dagReq.RootExecutor, true, []int64{mppTask.TableID}) } @@ -244,6 +254,8 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT zap.String("exchange-compression-mode", pf.Sink.GetCompressionMode().Name()), zap.Uint64("GatherID", c.gatherID), zap.String("resource_group", rgName), + zap.String("sqlDigest", sqlDigest), + zap.String("planDigest", planDigestStr), ) req := &kv.MPPDispatchRequest{ Data: pbData, @@ -262,6 +274,8 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT ResourceGroupName: rgName, ConnectionID: c.sessionCtx.GetSessionVars().ConnectionID, ConnectionAlias: c.sessionCtx.ShowProcess().SessionAlias, + SQLDigest: sqlDigest, + PlanDigest: planDigestStr, } c.reqMap[req.ID] = &mppRequestReport{mppReq: req, receivedReport: false, errMsg: "", executionSummaries: nil} c.mppReqs = append(c.mppReqs, req) @@ -606,14 +620,14 @@ func (c *localMppCoordinator) handleDispatchReq(ctx context.Context, bo *backoff }) if retry { // TODO: If we want to retry, we might need to redo the plan fragment cutting and task scheduling. https://github.com/pingcap/tidb/issues/31015 - logutil.BgLogger().Warn("mpp dispatch meet error and retrying", zap.Error(err), zap.Uint64("timestamp", c.startTS), zap.Int64("task", req.ID), zap.Int64("mpp-version", req.MppVersion.ToInt64())) + logutil.BgLogger().Warn("mpp dispatch meet error and retrying", zap.Error(err), zap.Uint64("timestamp", c.startTS), zap.Int64("task", req.ID), zap.Int64("mpp-version", req.MppVersion.ToInt64()), zap.String("sqlDigest", req.SQLDigest), zap.String("planDigest", req.PlanDigest)) continue } break } if err != nil { - logutil.BgLogger().Warn("mpp dispatch meet error", zap.String("error", err.Error()), zap.Uint64("timestamp", req.StartTs), zap.Int64("task", req.ID), zap.Int64("mpp-version", req.MppVersion.ToInt64())) + logutil.BgLogger().Warn("mpp dispatch meet error", zap.String("error", err.Error()), zap.Uint64("timestamp", req.StartTs), zap.Int64("task", req.ID), zap.Int64("mpp-version", req.MppVersion.ToInt64()), zap.String("sqlDigest", req.SQLDigest), zap.String("planDigest", req.PlanDigest)) atomic.CompareAndSwapUint32(&c.dispatchFailed, 0, 1) // if NeedTriggerFallback is true, we return timeout to trigger tikv's fallback if c.needTriggerFallback { @@ -624,7 +638,7 @@ func (c *localMppCoordinator) handleDispatchReq(ctx context.Context, bo *backoff } if rpcResp.Error != nil { - logutil.BgLogger().Warn("mpp dispatch response meet error", zap.String("error", rpcResp.Error.Msg), zap.Uint64("timestamp", req.StartTs), zap.Int64("task", req.ID), zap.Int64("task-mpp-version", req.MppVersion.ToInt64()), zap.Int64("error-mpp-version", rpcResp.Error.GetMppVersion())) + logutil.BgLogger().Warn("mpp dispatch response meet error", zap.String("error", rpcResp.Error.Msg), zap.Uint64("timestamp", req.StartTs), zap.Int64("task", req.ID), zap.Int64("task-mpp-version", req.MppVersion.ToInt64()), zap.Int64("error-mpp-version", rpcResp.Error.GetMppVersion()), zap.String("sqlDigest", req.SQLDigest), zap.String("planDigest", req.PlanDigest)) atomic.CompareAndSwapUint32(&c.dispatchFailed, 0, 1) c.sendError(errors.New(rpcResp.Error.Msg)) return @@ -645,6 +659,8 @@ func (c *localMppCoordinator) handleDispatchReq(ctx context.Context, bo *backoff Address: req.Meta.GetAddress(), MppVersion: req.MppVersion.ToInt64(), ResourceGroupName: req.ResourceGroupName, + SqlDigest: req.SQLDigest, + PlanDigest: req.PlanDigest, } c.receiveResults(req, taskMeta, bo) } diff --git a/pkg/kv/mpp.go b/pkg/kv/mpp.go index d66ab382969ed..f56738aab639d 100644 --- a/pkg/kv/mpp.go +++ b/pkg/kv/mpp.go @@ -162,6 +162,8 @@ type MPPDispatchRequest struct { ResourceGroupName string ConnectionID uint64 ConnectionAlias string + SQLDigest string + PlanDigest string } // CancelMPPTasksParam represents parameter for MPPClient's CancelMPPTasks @@ -242,7 +244,7 @@ type MPPBuildTasksRequest struct { // ToString returns a string representation of MPPBuildTasksRequest. Used for CacheKey. func (req *MPPBuildTasksRequest) ToString() string { sb := strings.Builder{} - if req.KeyRanges != nil { // Non-partiton + if req.KeyRanges != nil { // Non-partition for i, keyRange := range req.KeyRanges { sb.WriteString("range_id" + strconv.Itoa(i)) sb.WriteString(keyRange.StartKey.String()) diff --git a/pkg/store/copr/mpp.go b/pkg/store/copr/mpp.go index 3770b646ae742..181131e5b1169 100644 --- a/pkg/store/copr/mpp.go +++ b/pkg/store/copr/mpp.go @@ -111,6 +111,8 @@ func (c *MPPClient) DispatchMPPTask(param kv.DispatchMPPTaskParam) (resp *mpp.Di ResourceGroupName: req.ResourceGroupName, ConnectionId: req.ConnectionID, ConnectionAlias: req.ConnectionAlias, + SqlDigest: req.SQLDigest, + PlanDigest: req.PlanDigest, } mppReq := &mpp.DispatchTaskRequest{ @@ -200,7 +202,7 @@ func (c *MPPClient) CancelMPPTasks(param kv.CancelMPPTasksParam) { firstReq := reqs[0] killReq := &mpp.CancelTaskRequest{ - Meta: &mpp.TaskMeta{StartTs: firstReq.StartTs, GatherId: firstReq.GatherID, QueryTs: firstReq.MppQueryID.QueryTs, LocalQueryId: firstReq.MppQueryID.LocalQueryID, ServerId: firstReq.MppQueryID.ServerID, MppVersion: firstReq.MppVersion.ToInt64(), ResourceGroupName: firstReq.ResourceGroupName}, + Meta: &mpp.TaskMeta{StartTs: firstReq.StartTs, GatherId: firstReq.GatherID, QueryTs: firstReq.MppQueryID.QueryTs, LocalQueryId: firstReq.MppQueryID.LocalQueryID, ServerId: firstReq.MppQueryID.ServerID, MppVersion: firstReq.MppVersion.ToInt64(), ResourceGroupName: firstReq.ResourceGroupName, SqlDigest: firstReq.SQLDigest, PlanDigest: firstReq.PlanDigest}, } wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{}) @@ -244,6 +246,8 @@ func (c *MPPClient) EstablishMPPConns(param kv.EstablishMPPConnsParam) (*tikvrpc MppVersion: req.MppVersion.ToInt64(), TaskId: -1, ResourceGroupName: req.ResourceGroupName, + SqlDigest: req.SQLDigest, + PlanDigest: req.PlanDigest, }, }