Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions dbms/src/Storages/KVStore/TiKVHelpers/PDTiKVClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ namespace FailPoints
extern const char force_pd_grpc_error[];
} // namespace FailPoints

enum class GCSafepointFetchStrategy
{
// Return the cached value directly. If the keyspace has not been refreshed yet,
// return 0 and leave cache advancement to the existing background / GC paths.
CacheOnly,
// Keep the original behavior: use a valid cache entry when possible, otherwise
// refresh from PD and update the shared cache.
UpdateCacheIfNeeded,
};

// The GC safepoint and its update time for a keyspace.
struct KeyspaceGCInfo
{
Expand Down Expand Up @@ -174,7 +184,8 @@ struct PDClientHelper
KeyspaceID keyspace_id,
bool ignore_cache = true,
Int64 safe_point_update_interval_seconds = 30,
Int64 safe_point_get_max_backoff_ms = 120000)
Int64 safe_point_get_max_backoff_ms = 120000,
GCSafepointFetchStrategy fetch_strategy = GCSafepointFetchStrategy::UpdateCacheIfNeeded)
{
UInt64 backoff_count = 0;
auto observe_backoff_count = [&](bool success) {
Expand All @@ -186,18 +197,31 @@ struct PDClientHelper

if (!ignore_cache)
{
// In order to avoid too frequent requests to PD,
// we cache the safe point for a while.
// at least one second
const auto min_interval = std::max(static_cast<Int64>(1), safe_point_update_interval_seconds);
auto ks_gc_info = ks_gc_sp_map.getGCSafepointIfValid(keyspace_id, min_interval);
if (ks_gc_info.has_value())
if (fetch_strategy == GCSafepointFetchStrategy::CacheOnly)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems if the input argument is ignore_cache = false and fetch_strategy = CacheOnly, the CacheOnly will be ignored

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should totally remove the param ignore_cache?

{
// Still valid, return the cached gc safepoint
// Query paths use the last globally observed safepoint only. They never
// push the cache forward on their own, which avoids PD traffic growing
// linearly with query concurrency. Returning 0 on cache miss preserves
// the best-effort semantics expected by the caller.
auto ks_gc_info = ks_gc_sp_map.getGCSafepoint(keyspace_id);
observe_backoff_count(true);
return ks_gc_info->gc_safepoint;
return ks_gc_info.has_value() ? ks_gc_info->gc_safepoint : 0;
}
else
{
// In order to avoid too frequent requests to PD,
// we cache the safe point for a while.
// at least one second
const auto min_interval = std::max(static_cast<Int64>(1), safe_point_update_interval_seconds);
auto ks_gc_info = ks_gc_sp_map.getGCSafepointIfValid(keyspace_id, min_interval);
if (ks_gc_info.has_value())
{
// Still valid, return the cached gc safepoint
observe_backoff_count(true);
return ks_gc_info->gc_safepoint;
}
// else fallback to fetch from PD
}
// else fallback to fetch from PD
}

pingcap::kv::Backoffer bo(std::max(static_cast<Int64>(0), safe_point_get_max_backoff_ms));
Expand Down
191 changes: 191 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1101,4 +1101,195 @@ TEST(KeyspacesGCInfoTest, Basic)
ASSERT_EQ(info.getGCSafepoint(1), std::nullopt);
}

namespace
{

class CountingPDClient : public pingcap::pd::IClient
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use failpoint instead of constructing a whole CountingPDClient just for testing?

{
public:
explicit CountingPDClient(uint64_t gc_safe_point_)
: gc_safe_point(gc_safe_point_)
{}

uint64_t getTS() override { return 0; }

pdpb::GetRegionResponse getRegionByKey(const std::string &) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

pdpb::GetRegionResponse getRegionByID(uint64_t) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

metapb::Store getStore(uint64_t) override { throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

bool isClusterBootstrapped() override { return true; }

std::vector<metapb::Store> getAllStores(bool) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

uint64_t getGCSafePoint() override { return gc_safe_point; }

uint64_t getGCSafePointV2(KeyspaceID) override { return gc_safe_point; }

pdpb::GetGCStateResponse getGCState(KeyspaceID keyspace_id) override
{
++gc_state_call_count;

pdpb::GetGCStateResponse gc_state;
auto * hdr = gc_state.mutable_header();
hdr->set_cluster_id(1);
hdr->mutable_error()->set_type(pdpb::ErrorType::OK);
auto * state = gc_state.mutable_gc_state();
state->mutable_keyspace_scope()->set_keyspace_id(keyspace_id);
state->set_is_keyspace_level_gc(true);
state->set_txn_safe_point(gc_safe_point);
state->set_gc_safe_point(gc_safe_point);
return gc_state;
}

pdpb::GetAllKeyspacesGCStatesResponse getAllKeyspacesGCStates() override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

KeyspaceID getKeyspaceID(const std::string &) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

void update(const std::vector<std::string> &, const pingcap::ClusterConfig &) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

bool isMock() override { return false; }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why return false here


std::string getLeaderUrl() override { throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

resource_manager::ListResourceGroupsResponse listResourceGroups(const resource_manager::ListResourceGroupsRequest &) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

resource_manager::GetResourceGroupResponse getResourceGroup(const resource_manager::GetResourceGroupRequest &) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

resource_manager::PutResourceGroupResponse addResourceGroup(const resource_manager::PutResourceGroupRequest &) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

resource_manager::PutResourceGroupResponse modifyResourceGroup(const resource_manager::PutResourceGroupRequest &) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

resource_manager::DeleteResourceGroupResponse deleteResourceGroup(const resource_manager::DeleteResourceGroupRequest &) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

resource_manager::TokenBucketsResponse acquireTokenBuckets(const resource_manager::TokenBucketsRequest &) override
{
throw pingcap::Exception("not implemented", pingcap::ErrorCodes::UnknownError);
}

uint64_t gc_safe_point;
size_t gc_state_call_count = 0;
};

} // namespace

TEST(PDClientHelperTest, CacheOnlyReadPathDoesNotFetchFromPD)
{
constexpr KeyspaceID keyspace_id = 9527;
PDClientHelper::removeKeyspaceGCSafepoint(keyspace_id);

auto pd_client = std::make_shared<CountingPDClient>(123456);

// Read path should not warm the cache on a miss.
auto safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
/* ignore_cache= */ false,
/* safe_point_update_interval_seconds= */ 30,
/* safe_point_get_max_backoff_ms= */ 1000,
GCSafepointFetchStrategy::CacheOnly);
ASSERT_EQ(safe_point, 0);
ASSERT_EQ(pd_client->gc_state_call_count, 0);

// Existing non-query paths still refresh the cache from PD.
safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
/* ignore_cache= */ false,
/* safe_point_update_interval_seconds= */ 30,
/* safe_point_get_max_backoff_ms= */ 1000);
ASSERT_EQ(safe_point, 123456);
ASSERT_EQ(pd_client->gc_state_call_count, 1);

// Once refreshed by other paths, read path consumes the cached value only.
safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
/* ignore_cache= */ false,
/* safe_point_update_interval_seconds= */ 30,
/* safe_point_get_max_backoff_ms= */ 1000,
GCSafepointFetchStrategy::CacheOnly);
ASSERT_EQ(safe_point, 123456);
ASSERT_EQ(pd_client->gc_state_call_count, 1);

PDClientHelper::removeKeyspaceGCSafepoint(keyspace_id);
}
Comment thread
CalvinNeo marked this conversation as resolved.

TEST(PDClientHelperTest, CacheOnlyReadPathCanReturnExpiredCache)
{
constexpr KeyspaceID keyspace_id = 9528;
PDClientHelper::removeKeyspaceGCSafepoint(keyspace_id);

auto pd_client = std::make_shared<CountingPDClient>(223344);

auto safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
/* ignore_cache= */ false,
/* safe_point_update_interval_seconds= */ 30,
/* safe_point_get_max_backoff_ms= */ 1000);
ASSERT_EQ(safe_point, 223344);
ASSERT_EQ(pd_client->gc_state_call_count, 1);

std::this_thread::sleep_for(std::chrono::seconds(2));

// The read path keeps using the last observed safepoint even after the cache is
// stale. Only non-query callers are allowed to advance it via PD.
safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
/* ignore_cache= */ false,
/* safe_point_update_interval_seconds= */ 1,
/* safe_point_get_max_backoff_ms= */ 1000,
GCSafepointFetchStrategy::CacheOnly);
ASSERT_EQ(safe_point, 223344);
ASSERT_EQ(pd_client->gc_state_call_count, 1);

pd_client->gc_safe_point = 223355;
safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
/* ignore_cache= */ false,
/* safe_point_update_interval_seconds= */ 1,
/* safe_point_get_max_backoff_ms= */ 1000);
ASSERT_EQ(safe_point, 223355);
ASSERT_EQ(pd_client->gc_state_call_count, 2);

PDClientHelper::removeKeyspaceGCSafepoint(keyspace_id);
}

} // namespace DB::tests
6 changes: 5 additions & 1 deletion dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,12 +723,16 @@ void checkStartTs(UInt64 start_ts, const Context & context, const String & req_i
auto pd_client = tmt.getPDClient();
if (unlikely(pd_client->isMock()))
return;
// Query paths only consume the last safepoint observed by TiFlash. Cache
// advancement is still handled by the existing non-query callers so that
// PD traffic does not scale with read concurrency.
auto safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
/* ignore_cache= */ false,
context.getSettingsRef().safe_point_update_interval_seconds,
context.getSettingsRef().safe_point_get_max_backoff_ms);
context.getSettingsRef().safe_point_get_max_backoff_ms,
GCSafepointFetchStrategy::CacheOnly);
if (start_ts < safe_point)
{
throw TiFlashException(
Expand Down