diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index 6ea4d608b1c..0080c6866d8 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit 6ea4d608b1c03fab89d17f54a2e399602231e27c +Subproject commit 0080c6866d8bbba32bdf198437dbc98cc078ea7e diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index f9d6d01955e..fb31e4476bb 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -327,6 +327,64 @@ void KVStore::persistRegion(const Region & region, const RegionTaskLock & region LOG_FMT_DEBUG(log, "Persist {} done", region.toString(false)); } +bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt) +{ + auto region_task_lock = region_manager.genRegionTaskLock(region_id); + const RegionPtr curr_region_ptr = getRegion(region_id); + return canFlushRegionDataImpl(curr_region_ptr, false, false, tmt, region_task_lock); +} + +bool KVStore::tryFlushRegionData(UInt64 region_id, bool try_until_succeed, TMTContext & tmt) +{ + auto region_task_lock = region_manager.genRegionTaskLock(region_id); + const RegionPtr curr_region_ptr = getRegion(region_id); + return canFlushRegionDataImpl(curr_region_ptr, true, try_until_succeed, tmt, region_task_lock); +} + +bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock) +{ + if (curr_region_ptr == nullptr) + { + throw Exception(fmt::format("region not found when trying flush", ErrorCodes::LOGICAL_ERROR)); + } + auto & curr_region = *curr_region_ptr; + + auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo(); + + LOG_FMT_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes); + + bool can_flush = false; + if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed) + || size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed)) + { + // if rows or bytes more than threshold, flush cache and persist mem data. + can_flush = true; + } + else + { + // if there is little data in mem, wait until time interval reached threshold. + // use random period so that lots of regions will not be persisted at same time. + auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT + can_flush = !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now()); + } + if (can_flush && flush_if_possible) + { + LOG_FMT_DEBUG(log, "{} flush region due to can_flush_data", curr_region.toString(false)); + if (tryFlushRegionCacheInStorage(tmt, curr_region, log, try_until_succeed)) + { + persistRegion(curr_region, region_task_lock, "compact raft log"); + curr_region.markCompactLog(); + curr_region.cleanApproxMemCacheInfo(); + return true; + } + else + { + return false; + } + } + return can_flush; +} + EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( raft_cmdpb::AdminCmdType cmd_type, UInt64 curr_region_id, @@ -360,39 +418,13 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( } else { - auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo(); - - LOG_FMT_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes); - - if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed) - || size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed)) - { - // if rows or bytes more than threshold, try to flush cache and persist mem data. - return true; - } - else - { - // if there is little data in mem, wait until time interval reached threshold. - // use random period so that lots of regions will not be persisted at same time. - auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT - return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now()); - } + return canFlushRegionDataImpl(curr_region_ptr, true, /* try_until_succeed */ false, tmt, region_task_lock); } }; if (check_sync_log()) { - if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false)) - { - persistRegion(curr_region, region_task_lock, "compact raft log"); - curr_region.markCompactLog(); - curr_region.cleanApproxMemCacheInfo(); - return EngineStoreApplyRes::Persist; - } - else - { - return EngineStoreApplyRes::None; - } + return EngineStoreApplyRes::Persist; } return EngineStoreApplyRes::None; } diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 66e2fe32b75..b58083557a1 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -108,6 +108,9 @@ class KVStore final : private boost::noncopyable TMTContext & tmt); EngineStoreApplyRes handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt); + bool needFlushRegionData(UInt64 region_id, TMTContext & tmt); + bool tryFlushRegionData(UInt64 region_id, bool try_until_succeed, TMTContext & tmt); + void handleApplySnapshot(metapb::Region && region, uint64_t peer_id, const SSTViewVec, uint64_t index, uint64_t term, TMTContext & tmt); std::vector /* */ preHandleSnapshotToFiles( @@ -219,6 +222,11 @@ class KVStore final : private boost::noncopyable UInt64 term, TMTContext & tmt); + /// Notice that if flush_if_possible is set to false, we only check if a flush is allowed by rowsize/size/interval. + /// It will not check if a flush will eventually succeed. + /// In other words, `canFlushRegionDataImpl(flush_if_possible=true)` can return false. + bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock); + void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller); void releaseReadIndexWorkers(); void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &); diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index 8a40ca9b15e..d4ba50d5714 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -128,6 +128,34 @@ EngineStoreApplyRes HandleAdminRaftCmd( } } +uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id) +{ + try + { + auto & kvstore = server->tmt->getKVStore(); + return kvstore->needFlushRegionData(region_id, *server->tmt); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + exit(-1); + } +} + +uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t until_succeed) +{ + try + { + auto & kvstore = server->tmt->getKVStore(); + return kvstore->tryFlushRegionData(region_id, until_succeed, *server->tmt); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + exit(-1); + } +} + static_assert(sizeof(RaftStoreProxyFFIHelper) == sizeof(TiFlashRaftProxyHelper)); static_assert(alignof(RaftStoreProxyFFIHelper) == alignof(TiFlashRaftProxyHelper)); diff --git a/dbms/src/Storages/Transaction/ProxyFFI.h b/dbms/src/Storages/Transaction/ProxyFFI.h index e1c01599275..aafe4b375eb 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.h +++ b/dbms/src/Storages/Transaction/ProxyFFI.h @@ -125,6 +125,8 @@ EngineStoreApplyRes HandleAdminRaftCmd( EngineStoreApplyRes HandleWriteRaftCmd(const EngineStoreServerWrap * server, WriteCmdsView cmds, RaftCmdHeader header); +uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id); +uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t until_succeed); void AtomicUpdateProxy(EngineStoreServerWrap * server, RaftStoreProxyFFIHelper * proxy); void HandleDestroy(EngineStoreServerWrap * server, uint64_t region_id); EngineStoreApplyRes HandleIngestSST(EngineStoreServerWrap * server, SSTViewVec snaps, RaftCmdHeader header); @@ -158,6 +160,8 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper( .fn_gen_cpp_string = GenCppRawString, .fn_handle_write_raft_cmd = HandleWriteRaftCmd, .fn_handle_admin_raft_cmd = HandleAdminRaftCmd, + .fn_need_flush_data = NeedFlushData, + .fn_try_flush_data = TryFlushData, .fn_atomic_update_proxy = AtomicUpdateProxy, .fn_handle_destroy = HandleDestroy, .fn_handle_ingest_sst = HandleIngestSST, diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp index 36a91522bb6..77aab06f6cf 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -1179,6 +1179,12 @@ void RegionKVStoreTest::testKVStore() ASSERT_EQ(e.message(), "unsupported admin command type InvalidAdmin"); } } + { + // There shall be data to flush. + ASSERT_EQ(kvs.needFlushRegionData(19, ctx.getTMTContext()), true); + // Force flush until succeed only for testing. + ASSERT_EQ(kvs.tryFlushRegionData(19, true, ctx.getTMTContext()), true); + } } void test_mergeresult()