diff --git a/dbms/src/Debug/dbgFuncSchema.cpp b/dbms/src/Debug/dbgFuncSchema.cpp index be91021b067..515550bb0f4 100644 --- a/dbms/src/Debug/dbgFuncSchema.cpp +++ b/dbms/src/Debug/dbgFuncSchema.cpp @@ -186,7 +186,6 @@ void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer gc_safe_point = PDClientHelper::getGCSafePointWithRetry( context.getTMTContext().getPDClient(), NullspaceID, - true, 30, context.getSettingsRef().safe_point_get_max_backoff_ms); if (!args.empty()) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 1023f775cad..fdf8be87387 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -439,7 +439,6 @@ bool DeltaMergeStore::updateGCSafePoint() auto safe_point = PDClientHelper::getGCSafePointWithRetry( pd_client, keyspace_id, - /* ignore_cache= */ false, global_context.getSettingsRef().safe_point_update_interval_seconds, global_context.getSettingsRef().safe_point_get_max_backoff_ms); latest_gc_safe_point.store(safe_point, std::memory_order_release); diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index f30615ea003..fece91eb0ac 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -728,7 +728,6 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( gc_safepoint = PDClientHelper::getGCSafePointWithRetry( pd_client, keyspace_id, - /* ignore_cache= */ false, context.getSettingsRef().safe_point_update_interval_seconds, context.getSettingsRef().safe_point_get_max_backoff_ms); } diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h b/dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h index d984ea703e0..2685209249f 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h @@ -46,6 +46,16 @@ namespace FailPoints extern const char force_pd_grpc_error[]; } // namespace FailPoints +enum class GCSafepointFetchStrategy +{ + // Return the cached value directly. If the keyspace has not been refreshed yet, + // return 0 and leave cache advancement to the existing background / GC paths. + CacheOnly, + // Keep the original behavior: use a valid cache entry when possible, otherwise + // refresh from PD and update the shared cache. + UpdateCacheIfNeeded, +}; + // The GC safepoint and its update time for a keyspace. struct KeyspaceGCInfo { @@ -172,9 +182,9 @@ struct PDClientHelper static Timestamp getGCSafePointWithRetry( const pingcap::pd::ClientPtr & pd_client, KeyspaceID keyspace_id, - bool ignore_cache = true, Int64 safe_point_update_interval_seconds = 30, - Int64 safe_point_get_max_backoff_ms = 120000) + Int64 safe_point_get_max_backoff_ms = 120000, + GCSafepointFetchStrategy fetch_strategy = GCSafepointFetchStrategy::UpdateCacheIfNeeded) { UInt64 backoff_count = 0; auto observe_backoff_count = [&](bool success) { @@ -184,7 +194,17 @@ struct PDClientHelper GET_METRIC(tiflash_gc_safepoint_backoff_count, type_failure).Observe(backoff_count); }; - if (!ignore_cache) + if (fetch_strategy == GCSafepointFetchStrategy::CacheOnly) + { + // Query paths use the last globally observed safepoint only. They never + // push the cache forward on their own, which avoids PD traffic growing + // linearly with query concurrency. Returning 0 on cache miss preserves + // the best-effort semantics expected by the caller. + auto ks_gc_info = ks_gc_sp_map.getGCSafepoint(keyspace_id); + observe_backoff_count(true); + return ks_gc_info.has_value() ? ks_gc_info->gc_safepoint : 0; + } + else { // In order to avoid too frequent requests to PD, // we cache the safe point for a while. diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index e7fd344bc3a..28f86bb8abb 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -1101,4 +1101,202 @@ TEST(KeyspacesGCInfoTest, Basic) ASSERT_EQ(info.getGCSafepoint(1), std::nullopt); } +namespace +{ + +class CountingPDClient : public pingcap::pd::IClient +{ +public: + explicit CountingPDClient(uint64_t gc_safe_point_) + : gc_safe_point(gc_safe_point_) + {} + + uint64_t getClusterID() override { return 1; } + + uint64_t getTS() override { return 0; } + + pdpb::GetRegionResponse getRegionByKey(const std::string &) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + pdpb::GetRegionResponse getRegionByID(uint64_t) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + metapb::Store getStore(uint64_t) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + bool isClusterBootstrapped() override { return true; } + + std::vector getAllStores(bool) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + uint64_t getGCSafePoint() override { return gc_safe_point; } + + uint64_t getGCSafePointV2(KeyspaceID) override { return gc_safe_point; } + + pdpb::GetGCStateResponse getGCState(KeyspaceID keyspace_id) override + { + ++gc_state_call_count; + + pdpb::GetGCStateResponse gc_state; + auto * hdr = gc_state.mutable_header(); + hdr->set_cluster_id(1); + hdr->mutable_error()->set_type(pdpb::ErrorType::OK); + auto * state = gc_state.mutable_gc_state(); + state->mutable_keyspace_scope()->set_keyspace_id(keyspace_id); + state->set_is_keyspace_level_gc(true); + state->set_txn_safe_point(gc_safe_point); + state->set_gc_safe_point(gc_safe_point); + return gc_state; + } + + pdpb::GetAllKeyspacesGCStatesResponse getAllKeyspacesGCStates() override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + KeyspaceID getKeyspaceID(const std::string &) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + void update(const std::vector &, const pingcap::ClusterConfig &) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + bool isMock() override { return false; } + + std::string getLeaderUrl() override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + resource_manager::ListResourceGroupsResponse listResourceGroups( + const resource_manager::ListResourceGroupsRequest &) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + resource_manager::GetResourceGroupResponse getResourceGroup( + const resource_manager::GetResourceGroupRequest &) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + resource_manager::PutResourceGroupResponse addResourceGroup( + const resource_manager::PutResourceGroupRequest &) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + resource_manager::PutResourceGroupResponse modifyResourceGroup( + const resource_manager::PutResourceGroupRequest &) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + resource_manager::DeleteResourceGroupResponse deleteResourceGroup( + const resource_manager::DeleteResourceGroupRequest &) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + resource_manager::TokenBucketsResponse acquireTokenBuckets(const resource_manager::TokenBucketsRequest &) override + { + throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); + } + + uint64_t gc_safe_point; + size_t gc_state_call_count = 0; +}; + +} // namespace + +TEST(PDClientHelperTest, CacheOnlyReadPathDoesNotFetchFromPD) +{ + constexpr KeyspaceID keyspace_id = 9527; + PDClientHelper::removeKeyspaceGCSafepoint(keyspace_id); + + auto pd_client = std::make_shared(123456); + + // Read path should not warm the cache on a miss. + auto safe_point = PDClientHelper::getGCSafePointWithRetry( + pd_client, + keyspace_id, + /* safe_point_update_interval_seconds= */ 30, + /* safe_point_get_max_backoff_ms= */ 1000, + GCSafepointFetchStrategy::CacheOnly); + ASSERT_EQ(safe_point, 0); + ASSERT_EQ(pd_client->gc_state_call_count, 0); + + // Existing non-query paths still refresh the cache from PD. + safe_point = PDClientHelper::getGCSafePointWithRetry( + pd_client, + keyspace_id, + /* safe_point_update_interval_seconds= */ 30, + /* safe_point_get_max_backoff_ms= */ 1000); + ASSERT_EQ(safe_point, 123456); + ASSERT_EQ(pd_client->gc_state_call_count, 1); + + // Once refreshed by other paths, read path consumes the cached value only. + safe_point = PDClientHelper::getGCSafePointWithRetry( + pd_client, + keyspace_id, + /* safe_point_update_interval_seconds= */ 30, + /* safe_point_get_max_backoff_ms= */ 1000, + GCSafepointFetchStrategy::CacheOnly); + ASSERT_EQ(safe_point, 123456); + ASSERT_EQ(pd_client->gc_state_call_count, 1); + + PDClientHelper::removeKeyspaceGCSafepoint(keyspace_id); +} + +TEST(PDClientHelperTest, CacheOnlyReadPathCanReturnExpiredCache) +{ + constexpr KeyspaceID keyspace_id = 9528; + PDClientHelper::removeKeyspaceGCSafepoint(keyspace_id); + + auto pd_client = std::make_shared(223344); + + auto safe_point = PDClientHelper::getGCSafePointWithRetry( + pd_client, + keyspace_id, + /* safe_point_update_interval_seconds= */ 30, + /* safe_point_get_max_backoff_ms= */ 1000); + ASSERT_EQ(safe_point, 223344); + ASSERT_EQ(pd_client->gc_state_call_count, 1); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // The read path keeps using the last observed safepoint even after the cache is + // stale. Only non-query callers are allowed to advance it via PD. + safe_point = PDClientHelper::getGCSafePointWithRetry( + pd_client, + keyspace_id, + /* safe_point_update_interval_seconds= */ 1, + /* safe_point_get_max_backoff_ms= */ 1000, + GCSafepointFetchStrategy::CacheOnly); + ASSERT_EQ(safe_point, 223344); + ASSERT_EQ(pd_client->gc_state_call_count, 1); + + pd_client->gc_safe_point = 223355; + safe_point = PDClientHelper::getGCSafePointWithRetry( + pd_client, + keyspace_id, + /* safe_point_update_interval_seconds= */ 1, + /* safe_point_get_max_backoff_ms= */ 1000); + ASSERT_EQ(safe_point, 223355); + ASSERT_EQ(pd_client->gc_state_call_count, 2); + + PDClientHelper::removeKeyspaceGCSafepoint(keyspace_id); +} + } // namespace DB::tests diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 76aa9b31edb..e062acc4e34 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -723,12 +723,15 @@ void checkStartTs(UInt64 start_ts, const Context & context, const String & req_i auto pd_client = tmt.getPDClient(); if (unlikely(pd_client->isMock())) return; + // Query paths only consume the last safepoint observed by TiFlash. Cache + // advancement is still handled by the existing non-query callers so that + // PD traffic does not scale with read concurrency. auto safe_point = PDClientHelper::getGCSafePointWithRetry( pd_client, keyspace_id, - /* ignore_cache= */ false, context.getSettingsRef().safe_point_update_interval_seconds, - context.getSettingsRef().safe_point_get_max_backoff_ms); + context.getSettingsRef().safe_point_get_max_backoff_ms, + GCSafepointFetchStrategy::CacheOnly); if (start_ts < safe_point) { throw TiFlashException( diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.cpp b/dbms/src/TiDB/Schema/SchemaSyncService.cpp index 52a51560262..33955034262 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.cpp +++ b/dbms/src/TiDB/Schema/SchemaSyncService.cpp @@ -105,7 +105,6 @@ void SchemaSyncService::addKeyspaceGCTasks() auto gc_safe_point = PDClientHelper::getGCSafePointWithRetry( context.getTMTContext().getPDClient(), keyspace, - true, 30, context.getSettingsRef().safe_point_get_max_backoff_ms); done_anything = gc(gc_safe_point, keyspace);