Skip to content

Commit

Permalink
KVStore: remove write storage in doLearnerRead
Browse files Browse the repository at this point in the history
  • Loading branch information
JinheLin committed Feb 20, 2025
1 parent d0584a1 commit 5c67600
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 81 deletions.
76 changes: 7 additions & 69 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,20 +263,15 @@ DM::WriteResult writeRegionDataToStorage(
}
}

std::variant<RegionDataReadInfoList, RegionException::RegionReadStatus, LockInfoPtr> resolveLocksAndReadRegionData(

std::variant<LockInfoPtr, RegionException::RegionReadStatus> RegionTable::checkRegionAndGetLocks(
const TiDB::TableID table_id,
const RegionPtr & region,
const Timestamp start_ts,
const std::unordered_set<UInt64> * bypass_lock_ts,
RegionVersion region_version,
RegionVersion conf_version,
bool resolve_locks,
bool need_data_value)
RegionVersion conf_version)
{
LockInfoPtr lock_info;

auto scanner = region->createCommittedScanner(true, need_data_value);

/// Some sanity checks for region meta.
{
/**
Expand All @@ -303,33 +298,12 @@ std::variant<RegionDataReadInfoList, RegionException::RegionReadStatus, LockInfo
table_id);
}

/// Deal with locks.
if (resolve_locks)
{
/// Check if there are any lock should be resolved, if so, throw LockException.
/// It will iterate all locks with in the time range.
lock_info = scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts});
}

auto scanner = region->createCommittedScanner(true, /*need_data_value*/ false);
/// Get transaction locks that should be resolved in this region.
auto lock_info = scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts});
if (lock_info)
return lock_info;

/// If there is no lock, leave scope of region scanner and raise LockException.
/// Read raw KVs from region cache.
RegionDataReadInfoList data_list_read;
// Shortcut for empty region.
if (!scanner.hasNext())
return data_list_read;

// If worked with raftstore v2, the final size may not equal to here.
data_list_read.reserve(scanner.writeMapSize());

// Tiny optimization for queries that need only handle, tso, delmark.
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
return RegionException::RegionReadStatus::OK;
}

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region)
Expand Down Expand Up @@ -456,42 +430,6 @@ DM::WriteResult RegionTable::writeCommittedByRegion(
return write_result;
}

RegionTable::ResolveLocksAndWriteRegionRes RegionTable::resolveLocksAndWriteRegion(
TMTContext & tmt,
const TiDB::TableID table_id,
const RegionPtr & region,
const Timestamp start_ts,
const std::unordered_set<UInt64> * bypass_lock_ts,
RegionVersion region_version,
RegionVersion conf_version,
const LoggerPtr & log)
{
auto region_data_lock = resolveLocksAndReadRegionData(
table_id,
region,
start_ts,
bypass_lock_ts,
region_version,
conf_version,
/* resolve_locks */ true,
/* need_data_value */ true);

return std::visit(
variant_op::overloaded{
[&](RegionDataReadInfoList & data_list_read) -> ResolveLocksAndWriteRegionRes {
if (data_list_read.empty())
return RegionException::RegionReadStatus::OK;
auto & context = tmt.getContext();
// There is no raft input here, so we can just ignore the fg flush request.
writeRegionDataToStorage(context, region, data_list_read, log);
RemoveRegionCommitCache(region, data_list_read);
return RegionException::RegionReadStatus::OK;
},
[](auto & r) -> ResolveLocksAndWriteRegionRes { return std::move(r); },
},
region_data_lock);
}

// Note that there could be a chance that the table have been totally removed from TiKV
// and TiFlash can not get the IStorage instance.
// - Check whether the StorageDeltaMerge is nullptr or not before you accessing to it.
Expand Down
10 changes: 3 additions & 7 deletions dbms/src/Storages/KVStore/Decode/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,14 @@ class RegionTable : private boost::noncopyable
const LoggerPtr & log,
bool lock_region = true);

/// Check transaction locks in region, and write committed data in it into storage engine if check passed. Otherwise throw an LockException.
/// The write logic is the same as #writeCommittedByRegion, with some extra checks about region version and conf_version.
using ResolveLocksAndWriteRegionRes = std::variant<LockInfoPtr, RegionException::RegionReadStatus>;
static ResolveLocksAndWriteRegionRes resolveLocksAndWriteRegion(
TMTContext & tmt,
/// Check region metas and get transaction locks in region.
static std::variant<LockInfoPtr, RegionException::RegionReadStatus> checkRegionAndGetLocks(
const TableID table_id,
const RegionPtr & region,
const Timestamp start_ts,
const std::unordered_set<UInt64> * bypass_lock_ts,
RegionVersion region_version,
RegionVersion conf_version,
const LoggerPtr & log);
RegionVersion conf_version);

void clear();

Expand Down
7 changes: 2 additions & 5 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,17 +406,14 @@ void LearnerReadWorker::waitIndex(
continue;
}

// Try to resolve locks and flush data into storage layer
const auto & physical_table_id = region_to_query.physical_table_id;
auto res = RegionTable::resolveLocksAndWriteRegion(
tmt,
auto res = RegionTable::checkRegionAndGetLocks(
physical_table_id,
region,
mvcc_query_info.start_ts,
region_to_query.bypass_lock_ts,
region_to_query.version,
region_to_query.conf_version,
log);
region_to_query.conf_version);

std::visit(
variant_op::overloaded{
Expand Down

0 comments on commit 5c67600

Please sign in to comment.