diff --git a/contrib/kvproto b/contrib/kvproto index 74c8557cdac..6a238795ae0 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 74c8557cdac0e1e3e87bcee4ba23cfb4f8fcd930 +Subproject commit 6a238795ae082905d3262fe642063bdcdbb1e973 diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 9b07a8c1709..43348a76cb8 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -48,11 +49,14 @@ #include #include #include +#include #include #include #include #include #include +#include +#include #include #include @@ -193,6 +197,76 @@ void updateSettingsForAutoSpill(ContextPtr & context, const LoggerPtr & log) LOG_WARNING(log, "auto spill is enabled, so per operator's memory threshold is disabled"); } } + +rust::Vec<::Range> buildEstimateKeyRanges(const google::protobuf::RepeatedPtrField & key_ranges) +{ + rust::Vec<::Range> ranges; + for (const auto & key_range : key_ranges) + { + ranges.push_back({ + .start = rust::Slice( + reinterpret_cast(key_range.start().data()), + key_range.start().size()), + .end = rust::Slice( + reinterpret_cast(key_range.end().data()), + key_range.end().size()), + }); + } + return ranges; +} + +TimezoneInfo buildEstimateTimezoneInfo(const coprocessor::TiCIEstimateCountRequest & request) +{ + TimezoneInfo timezone_info; + if (!request.time_zone_name().empty()) + timezone_info.resetByTimezoneName(request.time_zone_name()); + else + timezone_info.resetByTimezoneOffset(request.time_zone_offset()); + return timezone_info; +} + +tipb::FTSQueryInfo parseEstimateQueryInfo(const coprocessor::TiCIEstimateCountRequest & request) +{ + tipb::FTSQueryInfo query_info; + if (!query_info.ParseFromString(request.fts_query_info())) + throw TiFlashException("Failed to parse fts_query_info", Errors::Coprocessor::BadRequest); + if (query_info.match_expr_size() == 0) + throw TiFlashException("Empty TiCI estimate query expression", Errors::Coprocessor::BadRequest); + return query_info; +} + +rust::Vec<::Shard> buildEstimateShards(const coprocessor::TiCIEstimateCountRequest & request, UInt32 keyspace_id) +{ + rust::Vec<::Shard> shards; + for (const auto & shard_info : request.shard_infos()) + { + shards.push_back({ + .keyspace_id = keyspace_id, + .index_id = request.index_id(), + .shard_id = shard_info.shard_id(), + .shard_epoch = shard_info.shard_epoch(), + }); + } + return shards; +} + +rust::Vec<::ShardWithRange> buildEstimateShardRanges( + const coprocessor::TiCIEstimateCountRequest & request, + const rust::Vec & local_results) +{ + rust::Vec<::ShardWithRange> shards; + for (int i = 0; i < request.shard_infos_size(); ++i) + { + const auto & shard_info = request.shard_infos(i); + if (i >= static_cast(local_results.size()) || !local_results[static_cast(i)]) + continue; + shards.push_back({ + .shard_id = shard_info.shard_id(), + .ranges = buildEstimateKeyRanges(shard_info.ranges()), + }); + } + return shards; +} } // namespace grpc::Status FlashService::Coprocessor( @@ -884,6 +958,63 @@ grpc::Status FlashService::Compact( return manual_compact_manager->handleRequest(request, response); } +grpc::Status FlashService::GetEstimateTiCICount( + grpc::ServerContext * grpc_context, + const coprocessor::TiCIEstimateCountRequest * request, + coprocessor::TiCIEstimateCountResponse * response) +{ + CPUAffinityManager::getInstance().bindSelfGrpcThread(); + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; + + try + { + if (request->shard_infos_size() == 0) + return grpc::Status::OK; + + const auto keyspace_id = RequestUtils::deriveKeyspaceID(request->context()); + const auto fts_query_info = parseEstimateQueryInfo(*request); + const auto timezone_info = buildEstimateTimezoneInfo(*request); + auto [query, column_ids] = TS::TiCIReadTaskPool::tipbToTiCIExpr(fts_query_info.match_expr(), timezone_info); + (void)column_ids; + + auto shards = buildEstimateShards(*request, keyspace_id); + rust::Vec local_results; + [[maybe_unused]] auto snapshot = check_shards_and_acquire_snapshot(shards, local_results); + auto shard_ranges = buildEstimateShardRanges(*request, local_results); + if (shard_ranges.empty()) + return grpc::Status::OK; + + const auto estimate_result = estimate_count(keyspace_id, shard_ranges, query); + response->set_est_count(estimate_result.estimated_total_count); + LOG_INFO( + log, + "GetEstimateTiCICount done, est_count={}, input_shards={}, available_shards={}, sampled_shards={}", + response->est_count(), + request->shard_infos_size(), + estimate_result.available_shards, + estimate_result.sampled_shards); + } + catch (const pingcap::Exception & e) + { + LOG_WARNING(log, "GetEstimateTiCICount failed with KV exception: {}", e.message()); + response->set_other_error(e.message()); + } + catch (const std::exception & e) + { + LOG_WARNING(log, "GetEstimateTiCICount failed: {}", e.what()); + response->set_other_error(e.what()); + } + catch (...) + { + LOG_WARNING(log, "GetEstimateTiCICount failed with unknown exception"); + response->set_other_error("other exception"); + } + + return grpc::Status::OK; +} + grpc::Status FlashService::tryAddLock( grpc::ServerContext * grpc_context, const disaggregated::TryAddLockRequest * request, diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index 0c87fa5493a..d4323131a6e 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -100,6 +100,10 @@ class FlashService const kvrpcpb::CompactRequest * request, kvrpcpb::CompactResponse * response) override; + grpc::Status GetEstimateTiCICount( + grpc::ServerContext * grpc_context, + const coprocessor::TiCIEstimateCountRequest * request, + coprocessor::TiCIEstimateCountResponse * response) override; // For S3 Lock Service grpc::Status tryAddLock( diff --git a/dbms/src/Storages/Tantivy/TiCIReadTaskPool.h b/dbms/src/Storages/Tantivy/TiCIReadTaskPool.h index 7d6df3745c1..c3054acd69f 100644 --- a/dbms/src/Storages/Tantivy/TiCIReadTaskPool.h +++ b/dbms/src/Storages/Tantivy/TiCIReadTaskPool.h @@ -171,6 +171,7 @@ struct TiCIReadTaskPool bool is_count; std::shared_ptr> shards_snapshot; +public: static std::tuple<::Expr, std::vector> tipbToTiCIExpr( const tipb::Expr & expr, const TimezoneInfo & timezone_info)