From e1f8dff679cf0debda6c3713bfa5e69248310e31 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 24 Apr 2026 20:46:38 +0800 Subject: [PATCH 1/3] reduce mpp logs Signed-off-by: gengliqi --- .../internal/mpp/local_mpp_coordinator.go | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index df4dbc154c71c..fd92c7703304c 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -209,7 +209,14 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT } zoneHelper := taskZoneInfoHelper{} zoneHelper.init(allTiFlashZoneInfo) - for _, mppTask := range pf.Sink.GetSelfTasks() { + tasks := pf.Sink.GetSelfTasks() + taskIDs := make([]int64, 0, len(tasks)) + addresses := make([]string, 0, len(tasks)) + rgName := c.sessionCtx.GetSessionVars().StmtCtx.ResourceGroupName + if !vardef.EnableResourceControl.Load() { + rgName = "" + } + for _, mppTask := range tasks { if mppTask.PartitionTableIDs != nil { err = util.UpdateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs) } else if !mppTask.TiFlashStaticPrune { @@ -232,19 +239,8 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT return errors.Trace(err) } - rgName := c.sessionCtx.GetSessionVars().StmtCtx.ResourceGroupName - if !vardef.EnableResourceControl.Load() { - rgName = "" - } - logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs), - zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID), - zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()), - zap.String("plan", plannercore.ToString(pf.Sink)), - zap.Int64("mpp-version", mppTask.MppVersion.ToInt64()), - zap.String("exchange-compression-mode", pf.Sink.GetCompressionMode().Name()), - zap.Uint64("GatherID", c.gatherID), - zap.String("resource_group", rgName), - ) + taskIDs = append(taskIDs, mppTask.ID) + addresses = append(addresses, mppTask.Meta.GetAddress()) req := &kv.MPPDispatchRequest{ Data: pbData, Meta: mppTask.Meta, @@ -266,6 +262,19 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT c.reqMap[req.ID] = &mppRequestReport{mppReq: req, receivedReport: false, errMsg: "", executionSummaries: nil} c.mppReqs = append(c.mppReqs, req) } + if len(tasks) > 0 { + firstTask := tasks[0] + logutil.BgLogger().Info("Dispatch mpp tasks", zap.Uint64("timestamp", firstTask.StartTs), + zap.Int64s("IDs", taskIDs), zap.Int("task-count", len(taskIDs)), + zap.Uint64("QueryTs", firstTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", firstTask.MppQueryID.LocalQueryID), + zap.Uint64("ServerID", firstTask.MppQueryID.ServerID), zap.Strings("addresses", addresses), + zap.String("plan", plannercore.ToString(pf.Sink)), + zap.Int64("mpp-version", firstTask.MppVersion.ToInt64()), + zap.String("exchange-compression-mode", pf.Sink.GetCompressionMode().Name()), + zap.Uint64("GatherID", c.gatherID), + zap.String("resource_group", rgName), + ) + } return nil } From 546b0535fe390272479b19f17d56723b237dd49e Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 24 Apr 2026 21:07:10 +0800 Subject: [PATCH 2/3] u Signed-off-by: gengliqi --- pkg/executor/internal/mpp/local_mpp_coordinator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index fd92c7703304c..871f56bcf2778 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -265,7 +265,7 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT if len(tasks) > 0 { firstTask := tasks[0] logutil.BgLogger().Info("Dispatch mpp tasks", zap.Uint64("timestamp", firstTask.StartTs), - zap.Int64s("IDs", taskIDs), zap.Int("task-count", len(taskIDs)), + zap.Int64s("IDs", taskIDs), zap.Uint64("QueryTs", firstTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", firstTask.MppQueryID.LocalQueryID), zap.Uint64("ServerID", firstTask.MppQueryID.ServerID), zap.Strings("addresses", addresses), zap.String("plan", plannercore.ToString(pf.Sink)), From e5c4c54e94c3444c80387dfc618c81a39760eafc Mon Sep 17 00:00:00 2001 From: gengliqi Date: Wed, 29 Apr 2026 01:18:05 +0800 Subject: [PATCH 3/3] u Signed-off-by: gengliqi --- .../internal/mpp/local_mpp_coordinator.go | 60 +++++++++++++------ .../mpp/local_mpp_coordinator_test.go | 14 +++-- 2 files changed, 50 insertions(+), 24 deletions(-) diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index 871f56bcf2778..339237b6fe7ac 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -194,7 +194,10 @@ func NewLocalMPPCoordinator(ctx context.Context, sctx sessionctx.Context, is inf return coord } -func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allTiFlashZoneInfo map[string]string) error { +func (c *localMppCoordinator) appendMPPDispatchReq( + pf *physicalop.Fragment, + allTiFlashStoreInfo map[string]tiFlashStoreInfo, +) error { dagReq, err := builder.ConstructDAGReq(c.sessionCtx, []base.PhysicalPlan{pf.Sink}, kv.TiFlash) if err != nil { return errors.Trace(err) @@ -208,10 +211,10 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT dagReq.EncodeType = tipb.EncodeType_TypeChunk } zoneHelper := taskZoneInfoHelper{} - zoneHelper.init(allTiFlashZoneInfo) + zoneHelper.init(allTiFlashStoreInfo) tasks := pf.Sink.GetSelfTasks() taskIDs := make([]int64, 0, len(tasks)) - addresses := make([]string, 0, len(tasks)) + storeIDs := make([]uint64, 0, len(tasks)) rgName := c.sessionCtx.GetSessionVars().StmtCtx.ResourceGroupName if !vardef.EnableResourceControl.Load() { rgName = "" @@ -232,7 +235,7 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT return err } zoneHelper.isRoot = pf.IsRoot - zoneHelper.currentTaskZone = zoneHelper.allTiFlashZoneInfo[mppTask.Meta.GetAddress()] + zoneHelper.currentTaskZone = zoneHelper.allTiFlashStoreInfo[mppTask.Meta.GetAddress()].zone zoneHelper.fillSameZoneFlagForExchange(dagReq.RootExecutor) pbData, err := dagReq.Marshal() if err != nil { @@ -240,7 +243,7 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT } taskIDs = append(taskIDs, mppTask.ID) - addresses = append(addresses, mppTask.Meta.GetAddress()) + storeIDs = append(storeIDs, allTiFlashStoreInfo[mppTask.Meta.GetAddress()].storeID) req := &kv.MPPDispatchRequest{ Data: pbData, Meta: mppTask.Meta, @@ -265,9 +268,9 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *physicalop.Fragment, allT if len(tasks) > 0 { firstTask := tasks[0] logutil.BgLogger().Info("Dispatch mpp tasks", zap.Uint64("timestamp", firstTask.StartTs), - zap.Int64s("IDs", taskIDs), + zap.Int64s("IDs", taskIDs), zap.Uint64s("storeIDs", storeIDs), zap.Uint64("QueryTs", firstTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", firstTask.MppQueryID.LocalQueryID), - zap.Uint64("ServerID", firstTask.MppQueryID.ServerID), zap.Strings("addresses", addresses), + zap.Uint64("ServerID", firstTask.MppQueryID.ServerID), zap.String("plan", plannercore.ToString(pf.Sink)), zap.Int64("mpp-version", firstTask.MppVersion.ToInt64()), zap.String("exchange-compression-mode", pf.Sink.GetCompressionMode().Name()), @@ -362,9 +365,22 @@ func (c *localMppCoordinator) fixTaskForCTEStorageAndReader(exec *tipb.Executor, return nil } +type tiFlashStoreInfo struct { + zone string + storeID uint64 +} + +func addTiFlashStoreInfo(allTiFlashStoreInfo map[string]tiFlashStoreInfo, tiflashStore *tikv.Store) { + storeInfo := tiFlashStoreInfo{storeID: tiflashStore.StoreID()} + if tiflashZone, isSet := tiflashStore.GetLabelValue(placement.DCLabelKey); isSet { + storeInfo.zone = tiflashZone + } + allTiFlashStoreInfo[tiflashStore.GetAddr()] = storeInfo +} + // taskZoneInfoHelper used to help reset exchange executor's same zone flags type taskZoneInfoHelper struct { - allTiFlashZoneInfo map[string]string + allTiFlashStoreInfo map[string]tiFlashStoreInfo // exchangeZoneInfo is used to cache one mpp task's zone info: // key is executor id, value is zone info array // for ExchangeSender, it's target tiflash nodes' zone info; for ExchangeReceiver, it's source tiflash nodes' zone info @@ -374,9 +390,9 @@ type taskZoneInfoHelper struct { isRoot bool } -func (h *taskZoneInfoHelper) init(allTiFlashZoneInfo map[string]string) { +func (h *taskZoneInfoHelper) init(allTiFlashStoreInfo map[string]tiFlashStoreInfo) { h.tidbZone = config.GetGlobalConfig().Labels[placement.DCLabelKey] - h.allTiFlashZoneInfo = allTiFlashZoneInfo + h.allTiFlashStoreInfo = allTiFlashStoreInfo // initial capacity to 2, for one exchange sender and one exchange receiver h.exchangeZoneInfo = make(map[string][]string, 2) } @@ -414,7 +430,7 @@ func (h *taskZoneInfoHelper) collectExchangeZoneInfos(encodedTaskMeta [][]byte, zoneInfos = append(zoneInfos, "") continue } - zoneInfos = append(zoneInfos, h.allTiFlashZoneInfo[taskMeta.GetAddress()]) + zoneInfos = append(zoneInfos, h.allTiFlashStoreInfo[taskMeta.GetAddress()].zone) } return zoneInfos } @@ -943,24 +959,30 @@ func (c *localMppCoordinator) Execute(ctx context.Context) (kv.Response, []kv.Ke } c.nodeCnt = len(nodeInfo) - var allTiFlashZoneInfo map[string]string + var allTiFlashStoreInfo map[string]tiFlashStoreInfo if c.sessionCtx.GetStore() == nil { - allTiFlashZoneInfo = make(map[string]string) + allTiFlashStoreInfo = make(map[string]tiFlashStoreInfo) } else if tikvStore, ok := c.sessionCtx.GetStore().(helper.Storage); ok { cache := tikvStore.GetRegionCache() allTiFlashStores := cache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode) - allTiFlashZoneInfo = make(map[string]string, len(allTiFlashStores)) + allTiFlashStoreInfo = make(map[string]tiFlashStoreInfo, len(allTiFlashStores)) for _, tiflashStore := range allTiFlashStores { - tiflashStoreAddr := tiflashStore.GetAddr() - if tiflashZone, isSet := tiflashStore.GetLabelValue(placement.DCLabelKey); isSet { - allTiFlashZoneInfo[tiflashStoreAddr] = tiflashZone + addTiFlashStoreInfo(allTiFlashStoreInfo, tiflashStore) + } + if config.GetGlobalConfig().DisaggregatedTiFlash { + computeStores, getStoreErr := cache.GetTiFlashComputeStores( + backoff.NewBackoffer(ctx, copr.CopNextMaxBackoff).TiKVBackoffer()) + if getStoreErr == nil { + for _, tiflashStore := range computeStores { + addTiFlashStoreInfo(allTiFlashStoreInfo, tiflashStore) + } } } } else { - allTiFlashZoneInfo = make(map[string]string) + allTiFlashStoreInfo = make(map[string]tiFlashStoreInfo) } for _, frag := range frags { - err = c.appendMPPDispatchReq(frag, allTiFlashZoneInfo) + err = c.appendMPPDispatchReq(frag, allTiFlashStoreInfo) if err != nil { return nil, nil, errors.Trace(err) } diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator_test.go b/pkg/executor/internal/mpp/local_mpp_coordinator_test.go index 6a48b0dff99c0..2ba70761f9220 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator_test.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator_test.go @@ -55,12 +55,16 @@ func TestNeedReportExecutionSummary(t *testing.T) { } func mockTaskZoneInfoHelper(isRoot bool, taskZone string, tidbZone string, storeZoneMpp map[string]string, exchangeZoneInfo map[string][]string) taskZoneInfoHelper { + allTiFlashStoreInfo := make(map[string]tiFlashStoreInfo, len(storeZoneMpp)) + for addr, zone := range storeZoneMpp { + allTiFlashStoreInfo[addr] = tiFlashStoreInfo{zone: zone} + } helper := taskZoneInfoHelper{ - tidbZone: tidbZone, - currentTaskZone: taskZone, - isRoot: isRoot, - allTiFlashZoneInfo: storeZoneMpp, - exchangeZoneInfo: exchangeZoneInfo, + tidbZone: tidbZone, + currentTaskZone: taskZone, + isRoot: isRoot, + allTiFlashStoreInfo: allTiFlashStoreInfo, + exchangeZoneInfo: exchangeZoneInfo, } return helper }