Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,21 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT
}
zoneHelper := taskZoneInfoHelper{}
zoneHelper.init(allTiFlashZoneInfo)
_, stmtDigest := c.sessionCtx.GetSessionVars().StmtCtx.SQLDigest()
sqlDigest := ""
if stmtDigest != nil {
sqlDigest = stmtDigest.String()
}
_, planDigest := c.sessionCtx.GetSessionVars().StmtCtx.GetPlanDigest()
planDigestStr := ""
if planDigest != nil {
planDigestStr = planDigest.String()
}
for _, mppTask := range pf.Sink.GetSelfTasks() {
if mppTask.PartitionTableIDs != nil {
err = util.UpdateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs)
} else if !mppTask.TiFlashStaticPrune {
// If isDisaggregatedTiFlashStaticPrune is true, it means this TableScan is under PartitionUnoin,
// If isDisaggregatedTiFlashStaticPrune is true, it means this TableScan is under PartitionUnion,
// tableID in TableScan is already the physical table id of this partition, no need to update again.
err = util.UpdateExecutorTableID(context.Background(), dagReq.RootExecutor, true, []int64{mppTask.TableID})
}
Expand Down Expand Up @@ -244,6 +254,8 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT
zap.String("exchange-compression-mode", pf.Sink.GetCompressionMode().Name()),
zap.Uint64("GatherID", c.gatherID),
zap.String("resource_group", rgName),
zap.String("sqlDigest", sqlDigest),
zap.String("planDigest", planDigestStr),
)
req := &kv.MPPDispatchRequest{
Data: pbData,
Expand All @@ -262,6 +274,8 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT
ResourceGroupName: rgName,
ConnectionID: c.sessionCtx.GetSessionVars().ConnectionID,
ConnectionAlias: c.sessionCtx.ShowProcess().SessionAlias,
SQLDigest: sqlDigest,
PlanDigest: planDigestStr,
}
c.reqMap[req.ID] = &mppRequestReport{mppReq: req, receivedReport: false, errMsg: "", executionSummaries: nil}
c.mppReqs = append(c.mppReqs, req)
Expand Down Expand Up @@ -606,14 +620,14 @@ func (c *localMppCoordinator) handleDispatchReq(ctx context.Context, bo *backoff
})
if retry {
// TODO: If we want to retry, we might need to redo the plan fragment cutting and task scheduling. https://github.com/pingcap/tidb/issues/31015
logutil.BgLogger().Warn("mpp dispatch meet error and retrying", zap.Error(err), zap.Uint64("timestamp", c.startTS), zap.Int64("task", req.ID), zap.Int64("mpp-version", req.MppVersion.ToInt64()))
logutil.BgLogger().Warn("mpp dispatch meet error and retrying", zap.Error(err), zap.Uint64("timestamp", c.startTS), zap.Int64("task", req.ID), zap.Int64("mpp-version", req.MppVersion.ToInt64()), zap.String("sqlDigest", req.SQLDigest), zap.String("planDigest", req.PlanDigest))
continue
}
break
}

if err != nil {
logutil.BgLogger().Warn("mpp dispatch meet error", zap.String("error", err.Error()), zap.Uint64("timestamp", req.StartTs), zap.Int64("task", req.ID), zap.Int64("mpp-version", req.MppVersion.ToInt64()))
logutil.BgLogger().Warn("mpp dispatch meet error", zap.String("error", err.Error()), zap.Uint64("timestamp", req.StartTs), zap.Int64("task", req.ID), zap.Int64("mpp-version", req.MppVersion.ToInt64()), zap.String("sqlDigest", req.SQLDigest), zap.String("planDigest", req.PlanDigest))
atomic.CompareAndSwapUint32(&c.dispatchFailed, 0, 1)
// if NeedTriggerFallback is true, we return timeout to trigger tikv's fallback
if c.needTriggerFallback {
Expand All @@ -624,7 +638,7 @@ func (c *localMppCoordinator) handleDispatchReq(ctx context.Context, bo *backoff
}

if rpcResp.Error != nil {
logutil.BgLogger().Warn("mpp dispatch response meet error", zap.String("error", rpcResp.Error.Msg), zap.Uint64("timestamp", req.StartTs), zap.Int64("task", req.ID), zap.Int64("task-mpp-version", req.MppVersion.ToInt64()), zap.Int64("error-mpp-version", rpcResp.Error.GetMppVersion()))
logutil.BgLogger().Warn("mpp dispatch response meet error", zap.String("error", rpcResp.Error.Msg), zap.Uint64("timestamp", req.StartTs), zap.Int64("task", req.ID), zap.Int64("task-mpp-version", req.MppVersion.ToInt64()), zap.Int64("error-mpp-version", rpcResp.Error.GetMppVersion()), zap.String("sqlDigest", req.SQLDigest), zap.String("planDigest", req.PlanDigest))
atomic.CompareAndSwapUint32(&c.dispatchFailed, 0, 1)
c.sendError(errors.New(rpcResp.Error.Msg))
return
Expand All @@ -645,6 +659,8 @@ func (c *localMppCoordinator) handleDispatchReq(ctx context.Context, bo *backoff
Address: req.Meta.GetAddress(),
MppVersion: req.MppVersion.ToInt64(),
ResourceGroupName: req.ResourceGroupName,
SqlDigest: req.SQLDigest,
PlanDigest: req.PlanDigest,
}
c.receiveResults(req, taskMeta, bo)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ type MPPDispatchRequest struct {
ResourceGroupName string
ConnectionID uint64
ConnectionAlias string
SQLDigest string
PlanDigest string
}

// CancelMPPTasksParam represents parameter for MPPClient's CancelMPPTasks
Expand Down Expand Up @@ -242,7 +244,7 @@ type MPPBuildTasksRequest struct {
// ToString returns a string representation of MPPBuildTasksRequest. Used for CacheKey.
func (req *MPPBuildTasksRequest) ToString() string {
sb := strings.Builder{}
if req.KeyRanges != nil { // Non-partiton
if req.KeyRanges != nil { // Non-partition
for i, keyRange := range req.KeyRanges {
sb.WriteString("range_id" + strconv.Itoa(i))
sb.WriteString(keyRange.StartKey.String())
Expand Down
6 changes: 5 additions & 1 deletion pkg/store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func (c *MPPClient) DispatchMPPTask(param kv.DispatchMPPTaskParam) (resp *mpp.Di
ResourceGroupName: req.ResourceGroupName,
ConnectionId: req.ConnectionID,
ConnectionAlias: req.ConnectionAlias,
SqlDigest: req.SQLDigest,
PlanDigest: req.PlanDigest,
}

mppReq := &mpp.DispatchTaskRequest{
Expand Down Expand Up @@ -200,7 +202,7 @@ func (c *MPPClient) CancelMPPTasks(param kv.CancelMPPTasksParam) {

firstReq := reqs[0]
killReq := &mpp.CancelTaskRequest{
Meta: &mpp.TaskMeta{StartTs: firstReq.StartTs, GatherId: firstReq.GatherID, QueryTs: firstReq.MppQueryID.QueryTs, LocalQueryId: firstReq.MppQueryID.LocalQueryID, ServerId: firstReq.MppQueryID.ServerID, MppVersion: firstReq.MppVersion.ToInt64(), ResourceGroupName: firstReq.ResourceGroupName},
Meta: &mpp.TaskMeta{StartTs: firstReq.StartTs, GatherId: firstReq.GatherID, QueryTs: firstReq.MppQueryID.QueryTs, LocalQueryId: firstReq.MppQueryID.LocalQueryID, ServerId: firstReq.MppQueryID.ServerID, MppVersion: firstReq.MppVersion.ToInt64(), ResourceGroupName: firstReq.ResourceGroupName, SqlDigest: firstReq.SQLDigest, PlanDigest: firstReq.PlanDigest},
}

wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{})
Expand Down Expand Up @@ -244,6 +246,8 @@ func (c *MPPClient) EstablishMPPConns(param kv.EstablishMPPConnsParam) (*tikvrpc
MppVersion: req.MppVersion.ToInt64(),
TaskId: -1,
ResourceGroupName: req.ResourceGroupName,
SqlDigest: req.SQLDigest,
PlanDigest: req.PlanDigest,
},
}

Expand Down
Loading