Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <IO/IOThreadPools.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Interpreters/TimezoneInfo.h>
#include <Interpreters/executeQuery.h>
#include <Poco/Message.h>
#include <Server/IServer.h>
Expand All @@ -48,11 +49,14 @@
#include <Storages/KVStore/Read/RegionException.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/S3/S3Common.h>
#include <Storages/Tantivy/TiCIReadTaskPool.h>
#include <common/logger_useful.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/support/status.h>
#include <grpcpp/support/status_code_enum.h>
#include <kvproto/disaggregated.pb.h>
#include <tici-search-lib/src/lib.rs.h>
#include <tipb/executor.pb.h>

#include <chrono>
#include <ext/scope_guard.h>
Expand Down Expand Up @@ -193,6 +197,76 @@ void updateSettingsForAutoSpill(ContextPtr & context, const LoggerPtr & log)
LOG_WARNING(log, "auto spill is enabled, so per operator's memory threshold is disabled");
}
}

rust::Vec<::Range> buildEstimateKeyRanges(const google::protobuf::RepeatedPtrField<coprocessor::KeyRange> & key_ranges)
{
rust::Vec<::Range> ranges;
for (const auto & key_range : key_ranges)
{
ranges.push_back({
.start = rust::Slice<const std::uint8_t>(
reinterpret_cast<const std::uint8_t *>(key_range.start().data()),
key_range.start().size()),
.end = rust::Slice<const std::uint8_t>(
reinterpret_cast<const std::uint8_t *>(key_range.end().data()),
key_range.end().size()),
});
}
return ranges;
}

TimezoneInfo buildEstimateTimezoneInfo(const coprocessor::TiCIEstimateCountRequest & request)
{
TimezoneInfo timezone_info;
if (!request.time_zone_name().empty())
timezone_info.resetByTimezoneName(request.time_zone_name());
else
timezone_info.resetByTimezoneOffset(request.time_zone_offset());
return timezone_info;
}

tipb::FTSQueryInfo parseEstimateQueryInfo(const coprocessor::TiCIEstimateCountRequest & request)
{
tipb::FTSQueryInfo query_info;
if (!query_info.ParseFromString(request.fts_query_info()))
throw TiFlashException("Failed to parse fts_query_info", Errors::Coprocessor::BadRequest);
if (query_info.match_expr_size() == 0)
throw TiFlashException("Empty TiCI estimate query expression", Errors::Coprocessor::BadRequest);
return query_info;
}

rust::Vec<::Shard> buildEstimateShards(const coprocessor::TiCIEstimateCountRequest & request, UInt32 keyspace_id)
{
rust::Vec<::Shard> shards;
for (const auto & shard_info : request.shard_infos())
{
shards.push_back({
.keyspace_id = keyspace_id,
.index_id = request.index_id(),
.shard_id = shard_info.shard_id(),
.shard_epoch = shard_info.shard_epoch(),
});
}
return shards;
}

rust::Vec<::ShardWithRange> buildEstimateShardRanges(
const coprocessor::TiCIEstimateCountRequest & request,
const rust::Vec<bool> & local_results)
{
rust::Vec<::ShardWithRange> shards;
for (int i = 0; i < request.shard_infos_size(); ++i)
{
const auto & shard_info = request.shard_infos(i);
if (i >= static_cast<int>(local_results.size()) || !local_results[static_cast<size_t>(i)])
continue;
shards.push_back({
.shard_id = shard_info.shard_id(),
.ranges = buildEstimateKeyRanges(shard_info.ranges()),
});
}
return shards;
}
} // namespace

grpc::Status FlashService::Coprocessor(
Expand Down Expand Up @@ -884,6 +958,63 @@ grpc::Status FlashService::Compact(
return manual_compact_manager->handleRequest(request, response);
}

grpc::Status FlashService::GetEstimateTiCICount(
grpc::ServerContext * grpc_context,
const coprocessor::TiCIEstimateCountRequest * request,
coprocessor::TiCIEstimateCountResponse * response)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;

try
{
if (request->shard_infos_size() == 0)
return grpc::Status::OK;

const auto keyspace_id = RequestUtils::deriveKeyspaceID(request->context());
const auto fts_query_info = parseEstimateQueryInfo(*request);
const auto timezone_info = buildEstimateTimezoneInfo(*request);
auto [query, column_ids] = TS::TiCIReadTaskPool::tipbToTiCIExpr(fts_query_info.match_expr(), timezone_info);
(void)column_ids;

auto shards = buildEstimateShards(*request, keyspace_id);
rust::Vec<bool> local_results;
[[maybe_unused]] auto snapshot = check_shards_and_acquire_snapshot(shards, local_results);
auto shard_ranges = buildEstimateShardRanges(*request, local_results);
if (shard_ranges.empty())
return grpc::Status::OK;

const auto estimate_result = estimate_count(keyspace_id, shard_ranges, query);
response->set_est_count(estimate_result.estimated_total_count);
LOG_INFO(
log,
"GetEstimateTiCICount done, est_count={}, input_shards={}, available_shards={}, sampled_shards={}",
response->est_count(),
request->shard_infos_size(),
estimate_result.available_shards,
estimate_result.sampled_shards);
}
catch (const pingcap::Exception & e)
{
LOG_WARNING(log, "GetEstimateTiCICount failed with KV exception: {}", e.message());
response->set_other_error(e.message());
}
catch (const std::exception & e)
{
LOG_WARNING(log, "GetEstimateTiCICount failed: {}", e.what());
response->set_other_error(e.what());
}
catch (...)
{
LOG_WARNING(log, "GetEstimateTiCICount failed with unknown exception");
response->set_other_error("other exception");
}

return grpc::Status::OK;
}

grpc::Status FlashService::tryAddLock(
grpc::ServerContext * grpc_context,
const disaggregated::TryAddLockRequest * request,
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class FlashService
const kvrpcpb::CompactRequest * request,
kvrpcpb::CompactResponse * response) override;

grpc::Status GetEstimateTiCICount(
grpc::ServerContext * grpc_context,
const coprocessor::TiCIEstimateCountRequest * request,
coprocessor::TiCIEstimateCountResponse * response) override;

// For S3 Lock Service
grpc::Status tryAddLock(
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Tantivy/TiCIReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ struct TiCIReadTaskPool
bool is_count;
std::shared_ptr<rust::Box<ShardsSnapshot>> shards_snapshot;

public:
static std::tuple<::Expr, std::vector<ColumnID>> tipbToTiCIExpr(
const tipb::Expr & expr,
const TimezoneInfo & timezone_info)
Expand Down