diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index fbd8e6f2099..4eb4ef4baa9 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit fbd8e6f209952d5bb79897d8bce17230acf23495 +Subproject commit 4eb4ef4baa94122fcae499037db1727dd09cfa59 diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index fb31e4476bb..c1394746e1a 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -410,20 +410,12 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( curr_region.handleWriteRaftCmd({}, index, term, tmt); - const auto check_sync_log = [&]() { - if (cmd_type != raft_cmdpb::AdminCmdType::CompactLog) - { - // ignore ComputeHash, VerifyHash or other useless cmd. - return false; - } - else - { - return canFlushRegionDataImpl(curr_region_ptr, true, /* try_until_succeed */ false, tmt, region_task_lock); - } - }; - - if (check_sync_log()) + if (cmd_type == raft_cmdpb::AdminCmdType::CompactLog) { + // Before CompactLog, we ought to make sure all data of this region are persisted. + // So proxy will firstly call an FFI `fn_try_flush_data` to trigger a attempt to flush data on TiFlash's side. + // If the attempt fails, Proxy will filter execution of this CompactLog, which means every CompactLog observed by TiFlash can ALWAYS succeed now. + // ref. https://github.com/pingcap/tidb-engine-ext/blob/e83a37d2d8d8ae1778fe279c5f06a851f8c9e56a/components/raftstore/src/engine_store_ffi/observer.rs#L175 return EngineStoreApplyRes::Persist; } return EngineStoreApplyRes::None; diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp index 77aab06f6cf..e157d711f1d 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -46,6 +46,7 @@ class RegionKVStoreTest : public ::testing::Test static void testKVStore(); static void testRegion(); static void testReadIndex(); + static void testNewProxy(); private: static void testRaftSplit(KVStore & kvs, TMTContext & tmt); @@ -54,6 +55,66 @@ class RegionKVStoreTest : public ::testing::Test static void testRaftMergeRollback(KVStore & kvs, TMTContext & tmt); }; +void RegionKVStoreTest::testNewProxy() +{ + std::string path = TiFlashTestEnv::getTemporaryPath("/region_kvs_tmp") + "/basic"; + + Poco::File file(path); + if (file.exists()) + file.remove(true); + file.createDirectories(); + + auto ctx = TiFlashTestEnv::getContext( + DB::Settings(), + Strings{ + path, + }); + KVStore & kvs = *ctx.getTMTContext().getKVStore(); + MockRaftStoreProxy proxy_instance; + TiFlashRaftProxyHelper proxy_helper; + { + proxy_helper = MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr{&proxy_instance}); + proxy_instance.init(100); + } + kvs.restore(&proxy_helper); + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + } + { + kvs.tryPersist(1); + kvs.gcRegionPersistedCache(Seconds{0}); + } + { + // test CompactLog + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + auto region = kvs.getRegion(1); + region->markCompactLog(); + kvs.setRegionCompactLogConfig(100000, 1000, 1000); + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + // CompactLog always returns true now, even if we can't do a flush. + // We use a tryFlushData to pre-filter. + ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 5, 1, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + // Filter + ASSERT_EQ(kvs.tryFlushRegionData(1, false, ctx.getTMTContext()), false); + } +} + void RegionKVStoreTest::testReadIndex() { std::string path = TiFlashTestEnv::getTemporaryPath("/region_kvs_tmp") + "/basic"; @@ -968,20 +1029,17 @@ void RegionKVStoreTest::testKVStore() request.mutable_compact_log(); request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); - - raft_cmdpb::AdminRequest first_request = request; raft_cmdpb::AdminResponse first_response = response; - - ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(first_request), std::move(first_response), 7, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(first_response), 7, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); raft_cmdpb::AdminResponse second_response = response; - ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(second_response), 7, 23, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); - request.set_cmd_type(::raft_cmdpb::AdminCmdType::ComputeHash); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(second_response), 7, 23, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::ComputeHash); raft_cmdpb::AdminResponse third_response = response; ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(third_response), 7, 24, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); - request.set_cmd_type(::raft_cmdpb::AdminCmdType::VerifyHash); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::VerifyHash); raft_cmdpb::AdminResponse fourth_response = response; ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fourth_response), 7, 25, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); @@ -1422,5 +1480,12 @@ try } CATCH +TEST_F(RegionKVStoreTest, NewProxy) +try +{ + testNewProxy(); +} +CATCH + } // namespace tests } // namespace DB