Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ci/build-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,20 @@ make -j $NPROC
nohup /mock-tikv/bin/mock-tikv &
mock_kv_pid=$!

for i in {1..100}; do
if (echo >/dev/tcp/127.0.0.1/2378) >/dev/null 2>&1; then
break
fi
sleep 0.1
done

if ! (echo >/dev/tcp/127.0.0.1/2378) >/dev/null 2>&1; then
echo "mock-tikv did not start listening on 127.0.0.1:2378"
kill -9 $mock_kv_pid || true
exit 1
fi

cd "$build_dir" && make test

kill -9 $mock_kv_pid


17 changes: 15 additions & 2 deletions include/pingcap/kv/2pc.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <cmath>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
Expand All @@ -21,6 +22,18 @@ constexpr uint32_t txnCommitBatchSize = 16 * 1024;

struct Txn;

struct TxnMutation
{
::kvrpcpb::Op op;
std::string value;

static TxnMutation Put(const std::string & value_) { return TxnMutation{::kvrpcpb::Put, value_}; }

static TxnMutation Del() { return TxnMutation{::kvrpcpb::Del, ""}; }

uint64_t valueSize() const { return op == ::kvrpcpb::Put ? value.size() : 0; }
};

struct TwoPhaseCommitter;

using TwoPhaseCommitterPtr = std::shared_ptr<TwoPhaseCommitter>;
Expand Down Expand Up @@ -82,7 +95,7 @@ class TTLManager
struct TwoPhaseCommitter : public std::enable_shared_from_this<TwoPhaseCommitter>
{
private:
std::unordered_map<std::string, std::string> mutations;
std::unordered_map<std::string, TxnMutation> mutations;

std::vector<std::string> keys;
uint64_t start_ts = 0;
Expand Down Expand Up @@ -169,7 +182,7 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this<TwoPhaseCommitter
auto & key = group.second[end];
size += key.size();
if constexpr (action == ActionPrewrite)
size += mutations[key].size();
size += mutations.at(key).valueSize();

if (key == primary_lock)
primary_idx = batches.size();
Expand Down
12 changes: 8 additions & 4 deletions include/pingcap/kv/Txn.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace pingcap
{
namespace kv
{
using Buffer = std::map<std::string, std::string>;
using Buffer = std::map<std::string, TxnMutation>;

// Txn supports transaction operation for TiKV.
// Note that this implementation is only used for TEST right now.
Expand Down Expand Up @@ -42,14 +42,18 @@ struct Txn
committer->execute();
}

void set(const std::string & key, const std::string & value) { buffer.emplace(key, value); }
void set(const std::string & key, const std::string & value) { buffer.insert_or_assign(key, TxnMutation::Put(value)); }

void del(const std::string & key) { buffer.insert_or_assign(key, TxnMutation::Del()); }

std::pair<std::string, bool> get(const std::string & key)
{
auto it = buffer.find(key);
if (it != buffer.end())
{
return std::make_pair(it->second, true);
if (it->second.op == ::kvrpcpb::Del)
return std::make_pair("", false);
return std::make_pair(it->second.value, true);
}
Snapshot snapshot(cluster, start_ts);
std::string value = snapshot.Get(key);
Expand All @@ -58,7 +62,7 @@ struct Txn
return std::make_pair(value, true);
}

void walkBuffer(std::function<void(const std::string &, const std::string &)> foo)
void walkBuffer(std::function<void(const std::string &, const TxnMutation &)> foo)
{
for (auto & it : buffer)
{
Expand Down
13 changes: 8 additions & 5 deletions src/kv/2pc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ TwoPhaseCommitter::TwoPhaseCommitter(Txn * txn, bool _use_async_commit)
, log(&Logger::get("pingcap.tikv"))
{
commited = false;
txn->walkBuffer([&](const std::string & key, const std::string & value) {
txn->walkBuffer([&](const std::string & key, const TxnMutation & mutation) {
keys.push_back(key);
mutations.emplace(key, value);
mutations.insert_or_assign(key, mutation);
});
cluster = txn->cluster;
start_ts = txn->start_ts;
Expand Down Expand Up @@ -138,8 +138,11 @@ void TwoPhaseCommitter::prewriteSingleBatch(Backoffer & bo, const BatchKeys & ba
for (const std::string & key : batch.keys)
{
auto * mut = req.add_mutations();
const auto & mutation = mutations.at(key);
mut->set_op(mutation.op);
mut->set_key(key);
mut->set_value(mutations[key]);
if (mutation.op == ::kvrpcpb::Put)
mut->set_value(mutation.value);
}
req.set_primary_lock(primary_lock);
req.set_start_version(start_ts);
Expand All @@ -150,9 +153,9 @@ void TwoPhaseCommitter::prewriteSingleBatch(Backoffer & bo, const BatchKeys & ba
{
if (batch.is_primary)
{
for (auto & [k, v] : mutations)
for (auto & [k, mutation] : mutations)
{
(void)v;
(void)mutation;
if (k == primary_lock)
continue;
auto * secondary = req.add_secondaries();
Expand Down
58 changes: 58 additions & 0 deletions src/test/region_split_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,64 @@ TEST_F(TestWithMockKVRegionSplit, testSplitRegionGet)
}
}

TEST_F(TestWithMockKVRegionSplit, testTxnDelete)
{
{
Txn txn(test_cluster.get());
txn.set("delete_test_key", "delete_test_value");
txn.commit();
}

{
Snapshot snap(test_cluster.get(), test_cluster->pd_client->getTS());
ASSERT_EQ(snap.Get("delete_test_key"), "delete_test_value");
}

{
Txn txn(test_cluster.get());
txn.del("delete_test_key");
auto buffered_result = txn.get("delete_test_key");
ASSERT_FALSE(buffered_result.second);
ASSERT_EQ(buffered_result.first, "");
txn.commit();
}

{
Snapshot snap(test_cluster.get(), test_cluster->pd_client->getTS());
ASSERT_EQ(snap.Get("delete_test_key"), "");
}

{
Txn txn(test_cluster.get());
txn.set("delete_test_key_2", "v1");
txn.del("delete_test_key_2");
auto buffered_result = txn.get("delete_test_key_2");
ASSERT_FALSE(buffered_result.second);
ASSERT_EQ(buffered_result.first, "");
txn.commit();
}

{
Snapshot snap(test_cluster.get(), test_cluster->pd_client->getTS());
ASSERT_EQ(snap.Get("delete_test_key_2"), "");
}

{
Txn txn(test_cluster.get());
txn.del("delete_test_key_3");
txn.set("delete_test_key_3", "v2");
auto buffered_result = txn.get("delete_test_key_3");
ASSERT_TRUE(buffered_result.second);
ASSERT_EQ(buffered_result.first, "v2");
txn.commit();
}

{
Snapshot snap(test_cluster.get(), test_cluster->pd_client->getTS());
ASSERT_EQ(snap.Get("delete_test_key_3"), "v2");
}
}
Comment on lines +101 to +135

TEST_F(TestWithMockKVRegionSplit, testSplitRegionScan)
{
Txn txn(test_cluster.get());
Expand Down
Loading