From 76845adb92ffa3e5b77c456f7049d60f0ddd1c93 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 28 May 2026 22:49:04 +0800 Subject: [PATCH] kv: support transactional delete mutations Signed-off-by: lance6716 --- ci/build-test.sh | 14 ++++++++- include/pingcap/kv/2pc.h | 17 ++++++++-- include/pingcap/kv/Txn.h | 12 +++++--- src/kv/2pc.cc | 13 +++++--- src/test/region_split_test.cc | 58 +++++++++++++++++++++++++++++++++++ 5 files changed, 102 insertions(+), 12 deletions(-) diff --git a/ci/build-test.sh b/ci/build-test.sh index 87c26f15..d9203979 100755 --- a/ci/build-test.sh +++ b/ci/build-test.sh @@ -16,8 +16,20 @@ make -j $NPROC nohup /mock-tikv/bin/mock-tikv & mock_kv_pid=$! +for i in {1..100}; do + if (echo >/dev/tcp/127.0.0.1/2378) >/dev/null 2>&1; then + break + fi + sleep 0.1 +done + +if ! (echo >/dev/tcp/127.0.0.1/2378) >/dev/null 2>&1; then + echo "mock-tikv did not start listening on 127.0.0.1:2378" + kill -9 $mock_kv_pid || true + exit 1 +fi + cd "$build_dir" && make test kill -9 $mock_kv_pid - diff --git a/include/pingcap/kv/2pc.h b/include/pingcap/kv/2pc.h index 416abbe4..b8c7c80d 100644 --- a/include/pingcap/kv/2pc.h +++ b/include/pingcap/kv/2pc.h @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -21,6 +22,18 @@ constexpr uint32_t txnCommitBatchSize = 16 * 1024; struct Txn; +struct TxnMutation +{ + ::kvrpcpb::Op op; + std::string value; + + static TxnMutation Put(const std::string & value_) { return TxnMutation{::kvrpcpb::Put, value_}; } + + static TxnMutation Del() { return TxnMutation{::kvrpcpb::Del, ""}; } + + uint64_t valueSize() const { return op == ::kvrpcpb::Put ? value.size() : 0; } +}; + struct TwoPhaseCommitter; using TwoPhaseCommitterPtr = std::shared_ptr; @@ -82,7 +95,7 @@ class TTLManager struct TwoPhaseCommitter : public std::enable_shared_from_this { private: - std::unordered_map mutations; + std::unordered_map mutations; std::vector keys; uint64_t start_ts = 0; @@ -169,7 +182,7 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this; +using Buffer = std::map; // Txn supports transaction operation for TiKV. // Note that this implementation is only used for TEST right now. @@ -42,14 +42,18 @@ struct Txn committer->execute(); } - void set(const std::string & key, const std::string & value) { buffer.emplace(key, value); } + void set(const std::string & key, const std::string & value) { buffer.insert_or_assign(key, TxnMutation::Put(value)); } + + void del(const std::string & key) { buffer.insert_or_assign(key, TxnMutation::Del()); } std::pair get(const std::string & key) { auto it = buffer.find(key); if (it != buffer.end()) { - return std::make_pair(it->second, true); + if (it->second.op == ::kvrpcpb::Del) + return std::make_pair("", false); + return std::make_pair(it->second.value, true); } Snapshot snapshot(cluster, start_ts); std::string value = snapshot.Get(key); @@ -58,7 +62,7 @@ struct Txn return std::make_pair(value, true); } - void walkBuffer(std::function foo) + void walkBuffer(std::function foo) { for (auto & it : buffer) { diff --git a/src/kv/2pc.cc b/src/kv/2pc.cc index 0514d5be..5ecbb31a 100644 --- a/src/kv/2pc.cc +++ b/src/kv/2pc.cc @@ -44,9 +44,9 @@ TwoPhaseCommitter::TwoPhaseCommitter(Txn * txn, bool _use_async_commit) , log(&Logger::get("pingcap.tikv")) { commited = false; - txn->walkBuffer([&](const std::string & key, const std::string & value) { + txn->walkBuffer([&](const std::string & key, const TxnMutation & mutation) { keys.push_back(key); - mutations.emplace(key, value); + mutations.insert_or_assign(key, mutation); }); cluster = txn->cluster; start_ts = txn->start_ts; @@ -138,8 +138,11 @@ void TwoPhaseCommitter::prewriteSingleBatch(Backoffer & bo, const BatchKeys & ba for (const std::string & key : batch.keys) { auto * mut = req.add_mutations(); + const auto & mutation = mutations.at(key); + mut->set_op(mutation.op); mut->set_key(key); - mut->set_value(mutations[key]); + if (mutation.op == ::kvrpcpb::Put) + mut->set_value(mutation.value); } req.set_primary_lock(primary_lock); req.set_start_version(start_ts); @@ -150,9 +153,9 @@ void TwoPhaseCommitter::prewriteSingleBatch(Backoffer & bo, const BatchKeys & ba { if (batch.is_primary) { - for (auto & [k, v] : mutations) + for (auto & [k, mutation] : mutations) { - (void)v; + (void)mutation; if (k == primary_lock) continue; auto * secondary = req.add_secondaries(); diff --git a/src/test/region_split_test.cc b/src/test/region_split_test.cc index 242eaf7e..858b3559 100644 --- a/src/test/region_split_test.cc +++ b/src/test/region_split_test.cc @@ -76,6 +76,64 @@ TEST_F(TestWithMockKVRegionSplit, testSplitRegionGet) } } +TEST_F(TestWithMockKVRegionSplit, testTxnDelete) +{ + { + Txn txn(test_cluster.get()); + txn.set("delete_test_key", "delete_test_value"); + txn.commit(); + } + + { + Snapshot snap(test_cluster.get(), test_cluster->pd_client->getTS()); + ASSERT_EQ(snap.Get("delete_test_key"), "delete_test_value"); + } + + { + Txn txn(test_cluster.get()); + txn.del("delete_test_key"); + auto buffered_result = txn.get("delete_test_key"); + ASSERT_FALSE(buffered_result.second); + ASSERT_EQ(buffered_result.first, ""); + txn.commit(); + } + + { + Snapshot snap(test_cluster.get(), test_cluster->pd_client->getTS()); + ASSERT_EQ(snap.Get("delete_test_key"), ""); + } + + { + Txn txn(test_cluster.get()); + txn.set("delete_test_key_2", "v1"); + txn.del("delete_test_key_2"); + auto buffered_result = txn.get("delete_test_key_2"); + ASSERT_FALSE(buffered_result.second); + ASSERT_EQ(buffered_result.first, ""); + txn.commit(); + } + + { + Snapshot snap(test_cluster.get(), test_cluster->pd_client->getTS()); + ASSERT_EQ(snap.Get("delete_test_key_2"), ""); + } + + { + Txn txn(test_cluster.get()); + txn.del("delete_test_key_3"); + txn.set("delete_test_key_3", "v2"); + auto buffered_result = txn.get("delete_test_key_3"); + ASSERT_TRUE(buffered_result.second); + ASSERT_EQ(buffered_result.first, "v2"); + txn.commit(); + } + + { + Snapshot snap(test_cluster.get(), test_cluster->pd_client->getTS()); + ASSERT_EQ(snap.Get("delete_test_key_3"), "v2"); + } +} + TEST_F(TestWithMockKVRegionSplit, testSplitRegionScan) { Txn txn(test_cluster.get());