From b27be83c10f9daa71dbcd91c15daa8220d8dbe9d Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Thu, 23 Apr 2026 16:29:05 +0800 Subject: [PATCH 1/5] flash: handle TiCI estimate count rpc --- contrib/kvproto | 2 +- dbms/src/Flash/FlashService.cpp | 144 +++++++++++++++++++ dbms/src/Flash/FlashService.h | 4 + dbms/src/Storages/Tantivy/TiCIReadTaskPool.h | 1 + 4 files changed, 150 insertions(+), 1 deletion(-) diff --git a/contrib/kvproto b/contrib/kvproto index 74c8557cdac..3c4b56de90c 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 74c8557cdac0e1e3e87bcee4ba23cfb4f8fcd930 +Subproject commit 3c4b56de90c5a466944a8fa695db092d35071a05 diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 9b07a8c1709..1723a3e832c 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -48,11 +49,14 @@ #include #include #include +#include #include #include #include #include #include +#include +#include #include #include @@ -193,6 +197,80 @@ void updateSettingsForAutoSpill(ContextPtr & context, const LoggerPtr & log) LOG_WARNING(log, "auto spill is enabled, so per operator's memory threshold is disabled"); } } + +rust::Vec<::Range> buildEstimateKeyRanges(const google::protobuf::RepeatedPtrField & key_ranges) +{ + rust::Vec<::Range> ranges; + for (const auto & key_range : key_ranges) + { + ranges.push_back({ + .start = rust::Slice( + reinterpret_cast(key_range.start().data()), + key_range.start().size()), + .end = rust::Slice( + reinterpret_cast(key_range.end().data()), + key_range.end().size()), + }); + } + return ranges; +} + +TimezoneInfo buildEstimateTimezoneInfo(const coprocessor::TiCIEstimateCountRequest & request) +{ + TimezoneInfo timezone_info; + if (!request.time_zone_name().empty()) + timezone_info.resetByTimezoneName(request.time_zone_name()); + else + timezone_info.resetByTimezoneOffset(request.time_zone_offset()); + return timezone_info; +} + +tipb::FTSQueryInfo parseEstimateQueryInfo(const coprocessor::TiCIEstimateCountRequest & request) +{ + tipb::FTSQueryInfo query_info; + if (!query_info.ParseFromString(request.fts_query_info())) + throw TiFlashException("Failed to parse fts_query_info", Errors::Coprocessor::BadRequest); + if (query_info.match_expr_size() == 0) + throw TiFlashException("Empty TiCI estimate query expression", Errors::Coprocessor::BadRequest); + return query_info; +} + +rust::Vec<::Shard> buildEstimateShards(const coprocessor::TiCIEstimateCountRequest & request, UInt32 keyspace_id) +{ + rust::Vec<::Shard> shards; + for (const auto & shard_info : request.shard_infos()) + { + shards.push_back({ + .keyspace_id = keyspace_id, + .index_id = request.index_id(), + .shard_id = shard_info.shard_id(), + .shard_epoch = shard_info.shard_epoch(), + }); + } + return shards; +} + +rust::Vec<::ShardWithRange> buildEstimateShardRanges( + const coprocessor::TiCIEstimateCountRequest & request, + const rust::Vec & local_results, + coprocessor::TiCIEstimateCountResponse * response) +{ + rust::Vec<::ShardWithRange> shards; + for (int i = 0; i < request.shard_infos_size(); ++i) + { + const auto & shard_info = request.shard_infos(i); + if (i >= static_cast(local_results.size()) || !local_results[static_cast(i)]) + { + response->add_retry_shards()->CopyFrom(shard_info); + continue; + } + shards.push_back({ + .shard_id = shard_info.shard_id(), + .ranges = buildEstimateKeyRanges(shard_info.ranges()), + }); + } + return shards; +} } // namespace grpc::Status FlashService::Coprocessor( @@ -884,6 +962,72 @@ grpc::Status FlashService::Compact( return manual_compact_manager->handleRequest(request, response); } +grpc::Status FlashService::EstimateTiCICount( + grpc::ServerContext * grpc_context, + const coprocessor::TiCIEstimateCountRequest * request, + coprocessor::TiCIEstimateCountResponse * response) +{ + CPUAffinityManager::getInstance().bindSelfGrpcThread(); + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; + + try + { + if (request->shard_infos_size() == 0) + return grpc::Status::OK; + + const auto keyspace_id = RequestUtils::deriveKeyspaceID(request->context()); + const auto fts_query_info = parseEstimateQueryInfo(*request); + const auto timezone_info = buildEstimateTimezoneInfo(*request); + auto [query, column_ids] = TS::TiCIReadTaskPool::tipbToTiCIExpr(fts_query_info.match_expr(), timezone_info); + (void)column_ids; + + auto shards = buildEstimateShards(*request, keyspace_id); + rust::Vec local_results; + [[maybe_unused]] auto snapshot = check_shards_and_acquire_snapshot(shards, local_results); + auto shard_ranges = buildEstimateShardRanges(*request, local_results, response); + if (response->retry_shards_size() > 0) + { + LOG_INFO( + log, + "EstimateTiCICount requires shard refresh, retry_shards={}, input_shards={}", + response->retry_shards_size(), + request->shard_infos_size()); + return grpc::Status::OK; + } + if (shard_ranges.empty()) + return grpc::Status::OK; + + const auto estimate_result = estimate_count(keyspace_id, shard_ranges, query); + response->set_est_count(estimate_result.estimated_total_count); + LOG_INFO( + log, + "EstimateTiCICount done, est_count={}, input_shards={}, available_shards={}, sampled_shards={}", + response->est_count(), + request->shard_infos_size(), + estimate_result.available_shards, + estimate_result.sampled_shards); + } + catch (const pingcap::Exception & e) + { + LOG_WARNING(log, "EstimateTiCICount failed with KV exception: {}", e.message()); + response->set_other_error(e.message()); + } + catch (const std::exception & e) + { + LOG_WARNING(log, "EstimateTiCICount failed: {}", e.what()); + response->set_other_error(e.what()); + } + catch (...) + { + LOG_WARNING(log, "EstimateTiCICount failed with unknown exception"); + response->set_other_error("other exception"); + } + + return grpc::Status::OK; +} + grpc::Status FlashService::tryAddLock( grpc::ServerContext * grpc_context, const disaggregated::TryAddLockRequest * request, diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index 0c87fa5493a..2e4cd92639c 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -100,6 +100,10 @@ class FlashService const kvrpcpb::CompactRequest * request, kvrpcpb::CompactResponse * response) override; + grpc::Status EstimateTiCICount( + grpc::ServerContext * grpc_context, + const coprocessor::TiCIEstimateCountRequest * request, + coprocessor::TiCIEstimateCountResponse * response) override; // For S3 Lock Service grpc::Status tryAddLock( diff --git a/dbms/src/Storages/Tantivy/TiCIReadTaskPool.h b/dbms/src/Storages/Tantivy/TiCIReadTaskPool.h index 7d6df3745c1..c3054acd69f 100644 --- a/dbms/src/Storages/Tantivy/TiCIReadTaskPool.h +++ b/dbms/src/Storages/Tantivy/TiCIReadTaskPool.h @@ -171,6 +171,7 @@ struct TiCIReadTaskPool bool is_count; std::shared_ptr> shards_snapshot; +public: static std::tuple<::Expr, std::vector> tipbToTiCIExpr( const tipb::Expr & expr, const TimezoneInfo & timezone_info) From 7d6434f0bd6f504bad3df94d86c9e2f7608abfc4 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Sun, 26 Apr 2026 20:17:26 +0800 Subject: [PATCH 2/5] flash: ignore missing shards in TiCI estimate --- contrib/kvproto | 2 +- dbms/src/Flash/FlashService.cpp | 27 +++++++-------------------- dbms/src/Flash/FlashService.h | 2 +- 3 files changed, 9 insertions(+), 22 deletions(-) diff --git a/contrib/kvproto b/contrib/kvproto index 3c4b56de90c..8d82ef2167a 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 3c4b56de90c5a466944a8fa695db092d35071a05 +Subproject commit 8d82ef2167a41b46ee1a372e95dd34686830e2a0 diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 1723a3e832c..43348a76cb8 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -252,18 +252,14 @@ rust::Vec<::Shard> buildEstimateShards(const coprocessor::TiCIEstimateCountReque rust::Vec<::ShardWithRange> buildEstimateShardRanges( const coprocessor::TiCIEstimateCountRequest & request, - const rust::Vec & local_results, - coprocessor::TiCIEstimateCountResponse * response) + const rust::Vec & local_results) { rust::Vec<::ShardWithRange> shards; for (int i = 0; i < request.shard_infos_size(); ++i) { const auto & shard_info = request.shard_infos(i); if (i >= static_cast(local_results.size()) || !local_results[static_cast(i)]) - { - response->add_retry_shards()->CopyFrom(shard_info); continue; - } shards.push_back({ .shard_id = shard_info.shard_id(), .ranges = buildEstimateKeyRanges(shard_info.ranges()), @@ -962,7 +958,7 @@ grpc::Status FlashService::Compact( return manual_compact_manager->handleRequest(request, response); } -grpc::Status FlashService::EstimateTiCICount( +grpc::Status FlashService::GetEstimateTiCICount( grpc::ServerContext * grpc_context, const coprocessor::TiCIEstimateCountRequest * request, coprocessor::TiCIEstimateCountResponse * response) @@ -986,16 +982,7 @@ grpc::Status FlashService::EstimateTiCICount( auto shards = buildEstimateShards(*request, keyspace_id); rust::Vec local_results; [[maybe_unused]] auto snapshot = check_shards_and_acquire_snapshot(shards, local_results); - auto shard_ranges = buildEstimateShardRanges(*request, local_results, response); - if (response->retry_shards_size() > 0) - { - LOG_INFO( - log, - "EstimateTiCICount requires shard refresh, retry_shards={}, input_shards={}", - response->retry_shards_size(), - request->shard_infos_size()); - return grpc::Status::OK; - } + auto shard_ranges = buildEstimateShardRanges(*request, local_results); if (shard_ranges.empty()) return grpc::Status::OK; @@ -1003,7 +990,7 @@ grpc::Status FlashService::EstimateTiCICount( response->set_est_count(estimate_result.estimated_total_count); LOG_INFO( log, - "EstimateTiCICount done, est_count={}, input_shards={}, available_shards={}, sampled_shards={}", + "GetEstimateTiCICount done, est_count={}, input_shards={}, available_shards={}, sampled_shards={}", response->est_count(), request->shard_infos_size(), estimate_result.available_shards, @@ -1011,17 +998,17 @@ grpc::Status FlashService::EstimateTiCICount( } catch (const pingcap::Exception & e) { - LOG_WARNING(log, "EstimateTiCICount failed with KV exception: {}", e.message()); + LOG_WARNING(log, "GetEstimateTiCICount failed with KV exception: {}", e.message()); response->set_other_error(e.message()); } catch (const std::exception & e) { - LOG_WARNING(log, "EstimateTiCICount failed: {}", e.what()); + LOG_WARNING(log, "GetEstimateTiCICount failed: {}", e.what()); response->set_other_error(e.what()); } catch (...) { - LOG_WARNING(log, "EstimateTiCICount failed with unknown exception"); + LOG_WARNING(log, "GetEstimateTiCICount failed with unknown exception"); response->set_other_error("other exception"); } diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index 2e4cd92639c..d4323131a6e 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -100,7 +100,7 @@ class FlashService const kvrpcpb::CompactRequest * request, kvrpcpb::CompactResponse * response) override; - grpc::Status EstimateTiCICount( + grpc::Status GetEstimateTiCICount( grpc::ServerContext * grpc_context, const coprocessor::TiCIEstimateCountRequest * request, coprocessor::TiCIEstimateCountResponse * response) override; From 38ccef87b8697ef5dd5a1d4dff18e836ac6b5d86 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Sun, 26 Apr 2026 20:33:20 +0800 Subject: [PATCH 3/5] flash: update kvproto for TiCI estimate response --- contrib/kvproto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/kvproto b/contrib/kvproto index 8d82ef2167a..3124d17bd7b 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 8d82ef2167a41b46ee1a372e95dd34686830e2a0 +Subproject commit 3124d17bd7b269f3ad437915feaf61bfa6313d8c From 923684a3219ce944c4d569b2a9da4d80aa22a4e8 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Sun, 26 Apr 2026 21:09:24 +0800 Subject: [PATCH 4/5] flash: use feature fts based kvproto --- contrib/kvproto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/kvproto b/contrib/kvproto index 3124d17bd7b..1ac7eff516d 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 3124d17bd7b269f3ad437915feaf61bfa6313d8c +Subproject commit 1ac7eff516d566461f7d9a4818f41e0b40decddc From 230fc8029d7b6a531e512a0c3bf8de18d37b724c Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 29 Apr 2026 14:44:35 +0800 Subject: [PATCH 5/5] contrib: update kvproto to feature fts Signed-off-by: wshwsh12 <793703860@qq.com> --- contrib/kvproto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/kvproto b/contrib/kvproto index 1ac7eff516d..6a238795ae0 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 1ac7eff516d566461f7d9a4818f41e0b40decddc +Subproject commit 6a238795ae082905d3262fe642063bdcdbb1e973