Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 60 additions & 29 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ func NewLocalMPPCoordinator(ctx context.Context, sctx sessionctx.Context, is inf
return coord
}

func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allTiFlashZoneInfo map[string]string) error {
func (c *localMppCoordinator) appendMPPDispatchReq(
pf *physicalop.Fragment,
allTiFlashStoreInfo map[string]tiFlashStoreInfo,
) error {
dagReq, err := builder.ConstructDAGReq(c.sessionCtx, []base.PhysicalPlan{pf.Sink}, kv.TiFlash)
if err != nil {
return errors.Trace(err)
Expand All @@ -208,8 +211,15 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT
dagReq.EncodeType = tipb.EncodeType_TypeChunk
}
zoneHelper := taskZoneInfoHelper{}
zoneHelper.init(allTiFlashZoneInfo)
for _, mppTask := range pf.Sink.GetSelfTasks() {
zoneHelper.init(allTiFlashStoreInfo)
tasks := pf.Sink.GetSelfTasks()
taskIDs := make([]int64, 0, len(tasks))
storeIDs := make([]uint64, 0, len(tasks))
rgName := c.sessionCtx.GetSessionVars().StmtCtx.ResourceGroupName
if !vardef.EnableResourceControl.Load() {
rgName = ""
}
for _, mppTask := range tasks {
if mppTask.PartitionTableIDs != nil {
err = util.UpdateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs)
} else if !mppTask.TiFlashStaticPrune {
Expand All @@ -225,26 +235,15 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT
return err
}
zoneHelper.isRoot = pf.IsRoot
zoneHelper.currentTaskZone = zoneHelper.allTiFlashZoneInfo[mppTask.Meta.GetAddress()]
zoneHelper.currentTaskZone = zoneHelper.allTiFlashStoreInfo[mppTask.Meta.GetAddress()].zone
zoneHelper.fillSameZoneFlagForExchange(dagReq.RootExecutor)
pbData, err := dagReq.Marshal()
if err != nil {
return errors.Trace(err)
}

rgName := c.sessionCtx.GetSessionVars().StmtCtx.ResourceGroupName
if !vardef.EnableResourceControl.Load() {
rgName = ""
}
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID),
zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
zap.String("plan", plannercore.ToString(pf.Sink)),
zap.Int64("mpp-version", mppTask.MppVersion.ToInt64()),
zap.String("exchange-compression-mode", pf.Sink.GetCompressionMode().Name()),
zap.Uint64("GatherID", c.gatherID),
zap.String("resource_group", rgName),
)
taskIDs = append(taskIDs, mppTask.ID)
storeIDs = append(storeIDs, allTiFlashStoreInfo[mppTask.Meta.GetAddress()].storeID)
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
Expand All @@ -266,6 +265,19 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT
c.reqMap[req.ID] = &mppRequestReport{mppReq: req, receivedReport: false, errMsg: "", executionSummaries: nil}
c.mppReqs = append(c.mppReqs, req)
}
if len(tasks) > 0 {
firstTask := tasks[0]
logutil.BgLogger().Info("Dispatch mpp tasks", zap.Uint64("timestamp", firstTask.StartTs),
zap.Int64s("IDs", taskIDs), zap.Uint64s("storeIDs", storeIDs),
zap.Uint64("QueryTs", firstTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", firstTask.MppQueryID.LocalQueryID),
zap.Uint64("ServerID", firstTask.MppQueryID.ServerID),
zap.String("plan", plannercore.ToString(pf.Sink)),
zap.Int64("mpp-version", firstTask.MppVersion.ToInt64()),
zap.String("exchange-compression-mode", pf.Sink.GetCompressionMode().Name()),
zap.Uint64("GatherID", c.gatherID),
zap.String("resource_group", rgName),
)
}
return nil
}

Expand Down Expand Up @@ -353,9 +365,22 @@ func (c *localMppCoordinator) fixTaskForCTEStorageAndReader(exec *tipb.Executor,
return nil
}

type tiFlashStoreInfo struct {
zone string
storeID uint64
}

func addTiFlashStoreInfo(allTiFlashStoreInfo map[string]tiFlashStoreInfo, tiflashStore *tikv.Store) {
storeInfo := tiFlashStoreInfo{storeID: tiflashStore.StoreID()}
if tiflashZone, isSet := tiflashStore.GetLabelValue(placement.DCLabelKey); isSet {
storeInfo.zone = tiflashZone
}
allTiFlashStoreInfo[tiflashStore.GetAddr()] = storeInfo
}

// taskZoneInfoHelper used to help reset exchange executor's same zone flags
type taskZoneInfoHelper struct {
allTiFlashZoneInfo map[string]string
allTiFlashStoreInfo map[string]tiFlashStoreInfo
// exchangeZoneInfo is used to cache one mpp task's zone info:
// key is executor id, value is zone info array
// for ExchangeSender, it's target tiflash nodes' zone info; for ExchangeReceiver, it's source tiflash nodes' zone info
Expand All @@ -365,9 +390,9 @@ type taskZoneInfoHelper struct {
isRoot bool
}

func (h *taskZoneInfoHelper) init(allTiFlashZoneInfo map[string]string) {
func (h *taskZoneInfoHelper) init(allTiFlashStoreInfo map[string]tiFlashStoreInfo) {
h.tidbZone = config.GetGlobalConfig().Labels[placement.DCLabelKey]
h.allTiFlashZoneInfo = allTiFlashZoneInfo
h.allTiFlashStoreInfo = allTiFlashStoreInfo
// initial capacity to 2, for one exchange sender and one exchange receiver
h.exchangeZoneInfo = make(map[string][]string, 2)
}
Expand Down Expand Up @@ -405,7 +430,7 @@ func (h *taskZoneInfoHelper) collectExchangeZoneInfos(encodedTaskMeta [][]byte,
zoneInfos = append(zoneInfos, "")
continue
}
zoneInfos = append(zoneInfos, h.allTiFlashZoneInfo[taskMeta.GetAddress()])
zoneInfos = append(zoneInfos, h.allTiFlashStoreInfo[taskMeta.GetAddress()].zone)
}
return zoneInfos
}
Expand Down Expand Up @@ -934,24 +959,30 @@ func (c *localMppCoordinator) Execute(ctx context.Context) (kv.Response, []kv.Ke
}
c.nodeCnt = len(nodeInfo)

var allTiFlashZoneInfo map[string]string
var allTiFlashStoreInfo map[string]tiFlashStoreInfo
if c.sessionCtx.GetStore() == nil {
allTiFlashZoneInfo = make(map[string]string)
allTiFlashStoreInfo = make(map[string]tiFlashStoreInfo)
} else if tikvStore, ok := c.sessionCtx.GetStore().(helper.Storage); ok {
cache := tikvStore.GetRegionCache()
allTiFlashStores := cache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
allTiFlashZoneInfo = make(map[string]string, len(allTiFlashStores))
allTiFlashStoreInfo = make(map[string]tiFlashStoreInfo, len(allTiFlashStores))
for _, tiflashStore := range allTiFlashStores {
tiflashStoreAddr := tiflashStore.GetAddr()
if tiflashZone, isSet := tiflashStore.GetLabelValue(placement.DCLabelKey); isSet {
allTiFlashZoneInfo[tiflashStoreAddr] = tiflashZone
addTiFlashStoreInfo(allTiFlashStoreInfo, tiflashStore)
}
if config.GetGlobalConfig().DisaggregatedTiFlash {
computeStores, getStoreErr := cache.GetTiFlashComputeStores(
backoff.NewBackoffer(ctx, copr.CopNextMaxBackoff).TiKVBackoffer())
if getStoreErr == nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if getStoreErr is not nil?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Traffic that could cross zones might be counted as traffic within the same zone, and the store ID in the logs might be 0.

for _, tiflashStore := range computeStores {
addTiFlashStoreInfo(allTiFlashStoreInfo, tiflashStore)
}
}
}
} else {
allTiFlashZoneInfo = make(map[string]string)
allTiFlashStoreInfo = make(map[string]tiFlashStoreInfo)
}
for _, frag := range frags {
err = c.appendMPPDispatchReq(frag, allTiFlashZoneInfo)
err = c.appendMPPDispatchReq(frag, allTiFlashStoreInfo)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/executor/internal/mpp/local_mpp_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,16 @@ func TestNeedReportExecutionSummary(t *testing.T) {
}

func mockTaskZoneInfoHelper(isRoot bool, taskZone string, tidbZone string, storeZoneMpp map[string]string, exchangeZoneInfo map[string][]string) taskZoneInfoHelper {
allTiFlashStoreInfo := make(map[string]tiFlashStoreInfo, len(storeZoneMpp))
for addr, zone := range storeZoneMpp {
allTiFlashStoreInfo[addr] = tiFlashStoreInfo{zone: zone}
}
helper := taskZoneInfoHelper{
tidbZone: tidbZone,
currentTaskZone: taskZone,
isRoot: isRoot,
allTiFlashZoneInfo: storeZoneMpp,
exchangeZoneInfo: exchangeZoneInfo,
tidbZone: tidbZone,
currentTaskZone: taskZone,
isRoot: isRoot,
allTiFlashStoreInfo: allTiFlashStoreInfo,
exchangeZoneInfo: exchangeZoneInfo,
}
return helper
}
Expand Down
Loading