From 30f5313338a03d8a51af1cf176080cb99526b1ae Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 15 Sep 2022 12:05:54 +0800 Subject: [PATCH 1/2] refactor Signed-off-by: CalvinNeo --- components/engine_store_ffi/src/observer.rs | 3 +- components/proxy_server/src/config.rs | 2 +- .../proxy_server/src/hacked_lock_mgr.rs | 2 +- components/proxy_server/src/proxy.rs | 2 - components/proxy_server/src/run.rs | 14 ++--- components/proxy_server/src/setup.rs | 10 +--- components/raftstore/src/store/fsm/apply.rs | 2 +- engine_tiflash/src/engine.rs | 4 +- tests/proxy/normal.rs | 56 +++++++++---------- tests/proxy/proxy.rs | 11 ++-- 10 files changed, 44 insertions(+), 62 deletions(-) diff --git a/components/engine_store_ffi/src/observer.rs b/components/engine_store_ffi/src/observer.rs index 2a7b27d082b..4769208136e 100644 --- a/components/engine_store_ffi/src/observer.rs +++ b/components/engine_store_ffi/src/observer.rs @@ -131,7 +131,7 @@ impl TiFlashObserver { ) -> Self { let engine_store_server_helper = gen_engine_store_server_helper(engine.engine_store_server_helper); - // TODO(tiflash) start thread pool + // start thread pool for pre handle snapshot let snap_pool = Builder::new(tikv_util::thd_name!("region-task")) .max_thread_count(snap_handle_pool_size) .build_future_pool(); @@ -242,7 +242,6 @@ impl TiFlashObserver { impl Coprocessor for TiFlashObserver { fn stop(&self) { - // TODO(tiflash) remove this when pre apply merged info!("shutdown tiflash observer"; "peer_id" => self.peer_id); self.apply_snap_pool.as_ref().unwrap().shutdown(); } diff --git a/components/proxy_server/src/config.rs b/components/proxy_server/src/config.rs index 60ec1c2558c..e2d6bd0e4e5 100644 --- a/components/proxy_server/src/config.rs +++ b/components/proxy_server/src/config.rs @@ -3,7 +3,7 @@ use std::{ collections::{hash_map::RandomState, HashSet}, iter::FromIterator, - path::{Path, PathBuf}, + path::Path, }; use itertools::Itertools; diff --git a/components/proxy_server/src/hacked_lock_mgr.rs b/components/proxy_server/src/hacked_lock_mgr.rs index 43c99ec5e78..d7a8be8aafb 100644 --- a/components/proxy_server/src/hacked_lock_mgr.rs +++ b/components/proxy_server/src/hacked_lock_mgr.rs @@ -1,5 +1,5 @@ use tikv::{ - server::{lock_manager::waiter_manager::Callback, Error, Result}, + server::lock_manager::waiter_manager::Callback, storage::{ lock_manager::{DiagnosticContext, Lock, LockManager as LockManagerTrait, WaitTimeout}, ProcessResult, StorageCallback, diff --git a/components/proxy_server/src/proxy.rs b/components/proxy_server/src/proxy.rs index 7a6574c40c3..5ef2d09b01e 100644 --- a/components/proxy_server/src/proxy.rs +++ b/components/proxy_server/src/proxy.rs @@ -1,6 +1,5 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -#![feature(proc_macro_hygiene)] use std::{ ffi::CStr, os::raw::{c_char, c_int}, @@ -10,7 +9,6 @@ use std::{ use clap::{App, Arg, ArgMatches}; use tikv::config::TiKvConfig; -use tikv_util::config::ReadableDuration; use crate::{config::make_tikv_config, fatal, setup::overwrite_config_with_cmd_args}; diff --git a/components/proxy_server/src/run.rs b/components/proxy_server/src/run.rs index 8b4d8e54ade..5d2c5557412 100644 --- a/components/proxy_server/src/run.rs +++ b/components/proxy_server/src/run.rs @@ -43,15 +43,11 @@ use grpcio::{EnvBuilder, Environment}; use grpcio_health::HealthService; use kvproto::{ debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst, - kvrpcpb::ApiVersion, }; use pd_client::{PdClient, RpcClient}; use raft_log_engine::RaftLogEngine; use raftstore::{ - coprocessor::{ - config::SplitCheckConfigManager, BoxConsistencyCheckObserver, ConsistencyCheckMethod, - CoprocessorHost, RawConsistencyCheckObserver, RegionInfoAccessor, - }, + coprocessor::{config::SplitCheckConfigManager, CoprocessorHost, RegionInfoAccessor}, router::ServerRaftStoreRouter, store::{ config::RaftstoreConfigManager, @@ -75,7 +71,7 @@ use tikv::{ server::{ config::{Config as ServerConfig, ServerConfigManager}, create_raft_storage, - gc_worker::{AutoGcConfig, GcWorker}, + gc_worker::GcWorker, raftkv::ReplicaReadLockChecker, resolve, service::{DebugService, DiagnosticsService}, @@ -84,8 +80,7 @@ use tikv::{ GRPC_THREAD_PREFIX, }, storage::{ - self, config_manager::StorageConfigManger, mvcc::MvccConsistencyCheckObserver, - txn::flow_controller::FlowController, Engine, + self, config_manager::StorageConfigManger, txn::flow_controller::FlowController, Engine, }, }; use tikv_util::{ @@ -217,7 +212,7 @@ pub fn run_impl( #[inline] fn run_impl_only_for_decryption( config: TiKvConfig, - proxy_config: ProxyConfig, + _proxy_config: ProxyConfig, engine_store_server_helper: &EngineStoreServerHelper, ) { let encryption_key_manager = @@ -927,7 +922,6 @@ impl TiKvServer { data_sink_reg_handle.clone(), ); self.to_stop.push(single_target_worker); - let rsmeter_pubsub_service = resource_metering::PubSubService::new(data_sink_reg_handle); let cfg_manager = resource_metering::ConfigManager::new( self.config.resource_metering.clone(), diff --git a/components/proxy_server/src/setup.rs b/components/proxy_server/src/setup.rs index f316f2119b4..0254f9e4501 100644 --- a/components/proxy_server/src/setup.rs +++ b/components/proxy_server/src/setup.rs @@ -1,13 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{ - borrow::ToOwned, - io, - path::{Path, PathBuf}, - sync::atomic::{AtomicBool, Ordering}, -}; - -use chrono::Local; +use std::borrow::ToOwned; + use clap::ArgMatches; use collections::HashMap; pub use server::setup::initial_logger; diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index fb80b9c70ef..a03b1a48af7 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -1497,7 +1497,7 @@ where ); } - let (mut response, mut exec_result) = match cmd_type { + let (mut response, exec_result) = match cmd_type { AdminCmdType::ChangePeer => self.exec_change_peer(ctx, request), AdminCmdType::ChangePeerV2 => self.exec_change_peer_v2(ctx, request), AdminCmdType::Split => self.exec_split(ctx, request), diff --git a/engine_tiflash/src/engine.rs b/engine_tiflash/src/engine.rs index 0504fff2dfa..3dd3d81e401 100644 --- a/engine_tiflash/src/engine.rs +++ b/engine_tiflash/src/engine.rs @@ -1,5 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. - +// Disable warnings for unused engine_rocks's feature. +#![allow(dead_code)] +#![allow(unused_variables)] use std::{ any::Any, cell::RefCell, diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 2ca95471ec7..14096b6c801 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -16,8 +16,7 @@ use clap::{App, Arg, ArgMatches}; use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; use engine_traits::{ Error, ExternalSstFileInfo, Iterable, Iterator, MiscExt, Mutable, Peekable, Result, SeekKey, - SstExt, SstReader, SstWriter, SstWriterBuilder, WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, - CF_RAFT, CF_WRITE, + SstExt, WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, }; use kvproto::{ import_sstpb::SstMeta, @@ -45,10 +44,7 @@ use proxy_server::{ run::run_tikv_proxy, }; use raft::eraftpb::MessageType; -use raftstore::{ - coprocessor::{ConsistencyCheckMethod, Coprocessor}, - store::util::find_peer, -}; +use raftstore::{coprocessor::ConsistencyCheckMethod, store::util::find_peer}; use sst_importer::SstImporter; pub use test_raftstore::{must_get_equal, must_get_none, new_peer}; use test_raftstore::{new_node_cluster, new_tikv_config}; @@ -154,7 +150,7 @@ mod region { #[test] fn test_get_region_local_state() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); cluster.run(); @@ -169,7 +165,7 @@ mod region { iter_ffi_helpers( &cluster, None, - &mut |id: u64, _, ffi_set: &mut FFIHelperSet| { + &mut |_id: u64, _, ffi_set: &mut FFIHelperSet| { let f = ffi_set.proxy_helper.fn_get_region_local_state.unwrap(); let mut state = kvproto::raft_serverpb::RegionLocalState::default(); let mut error_msg = mock_engine_store::RawCppStringPtrGuard::default(); @@ -374,7 +370,7 @@ mod write { fn test_interaction() { // TODO Maybe we should pick this test to TiKV. // This test is to check if empty entries can affect pre_exec and post_exec. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); fail::cfg("try_flush_data", "return(0)").unwrap(); let _ = cluster.run(); @@ -414,6 +410,9 @@ mod write { } std::thread::sleep(std::time::Duration::from_millis(100)); retry += 1; + if retry >= 30 { + panic!("states is not changed") + } }; for i in prev_states.keys() { @@ -461,7 +460,7 @@ mod write { fn test_leadership_change_impl(filter: bool) { // Test if a empty command can be observed when leadership changes. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); disable_auto_gen_compact_log(&mut cluster); @@ -538,7 +537,7 @@ mod write { #[test] fn test_kv_write_always_persist() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); let _ = cluster.run(); @@ -576,7 +575,7 @@ mod write { #[test] fn test_kv_write() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); fail::cfg("on_post_exec_normal", "return(false)").unwrap(); fail::cfg("on_post_exec_admin", "return(false)").unwrap(); @@ -709,7 +708,7 @@ mod write { #[test] fn test_consistency_check() { // ComputeHash and VerifyHash shall be filtered. - let (mut cluster, pd_client) = new_mock_cluster(0, 2); + let (mut cluster, _pd_client) = new_mock_cluster(0, 2); cluster.run(); @@ -738,7 +737,7 @@ mod write { // If we just return None for CompactLog, the region state in ApplyFsm will change. // Because there is no rollback in new implementation. // This is a ERROR state. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); cluster.run(); // We don't return Persist after handling CompactLog. @@ -762,7 +761,7 @@ mod write { let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let res = cluster + let _ = cluster .call_command_on_leader(req, Duration::from_secs(3)) .unwrap(); @@ -789,7 +788,7 @@ mod write { #[test] fn test_compact_log() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); disable_auto_gen_compact_log(&mut cluster); @@ -891,7 +890,7 @@ mod write { #[test] fn test_empty_cmd() { // Test if a empty command can be observed when leadership changes. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); // Disable compact log cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); @@ -1004,14 +1003,14 @@ mod ingest { // copy file to save dir. let src = sst_path.clone(); let dst = file.get_import_path().save.to_str().unwrap(); - std::fs::copy(src.clone(), dst); + let _ = std::fs::copy(src.clone(), dst); (file.get_import_path().save.clone(), meta, sst_path) } #[test] fn test_handle_ingest_sst() { - let (mut cluster, pd_client) = new_mock_cluster(0, 1); + let (mut cluster, _pd_client) = new_mock_cluster(0, 1); let _ = cluster.run(); let key = "k"; @@ -1044,7 +1043,7 @@ mod ingest { #[test] fn test_invalid_ingest_sst() { - let (mut cluster, pd_client) = new_mock_cluster(0, 1); + let (mut cluster, _pd_client) = new_mock_cluster(0, 1); let _ = cluster.run(); @@ -1080,7 +1079,7 @@ mod ingest { #[test] fn test_ingest_return_none() { - let (mut cluster, pd_client) = new_mock_cluster(0, 1); + let (mut cluster, _pd_client) = new_mock_cluster(0, 1); disable_auto_gen_compact_log(&mut cluster); @@ -1109,7 +1108,7 @@ mod ingest { let req = new_ingest_sst_cmd(meta1); let _ = cluster.request(b"k1", vec![req], false, Duration::from_secs(5), true); - let (file5, meta5, sst_path5) = make_sst( + let (file5, meta5, _sst_path5) = make_sst( &cluster, region5.get_id(), region5.get_region_epoch().clone(), @@ -1283,7 +1282,7 @@ mod restart { #[test] fn test_kv_restart() { // Test if a empty command can be observed when leadership changes. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); // Disable AUTO generated compact log. disable_auto_gen_compact_log(&mut cluster); @@ -1306,7 +1305,7 @@ mod restart { let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); fail::cfg("try_flush_data", "return(1)").unwrap(); - let res = cluster + let _ = cluster .call_command_on_leader(req, Duration::from_secs(3)) .unwrap(); @@ -1524,13 +1523,13 @@ mod snapshot { .wl() .add_recv_filter(3, Box::new(CollectSnapshotFilter::new(tx))); pd_client.must_add_peer(r1, new_peer(3, 3)); - let region = cluster.get_region(b"k1"); // Ensure the snapshot of range ("", "") is sent and piled in filter. if let Err(e) = rx.recv_timeout(Duration::from_secs(1)) { panic!("the snapshot is not sent before split, e: {:?}", e); } // Occasionally fails. + // let region1 = cluster.get_region(b"k1"); // // Split the region range and then there should be another snapshot for the split ranges. // cluster.must_split(®ion, b"k2"); // check_key(&cluster, b"k3", b"v3", None, Some(true), Some(vec![3])); @@ -1551,7 +1550,7 @@ mod snapshot { // Disable default max peer count check. pd_client.disable_default_operator(); - let r1 = cluster.run_conf_change(); + let _ = cluster.run_conf_change(); for i in 0..count { let k = format!("k{:0>4}", 2 * i + 1); let v = format!("v{}", 2 * i + 1); @@ -1565,7 +1564,6 @@ mod snapshot { let region = cluster.get_region(k.as_bytes()); let sp = format!("k{:0>4}", 2 * i + 2); cluster.must_split(®ion, sp.as_bytes()); - let region = cluster.get_region(k.as_bytes()); } (cluster, pd_client) @@ -1673,7 +1671,7 @@ mod snapshot { }); pd_client.must_merge(r1_new.get_id(), r3_new.get_id()); - let r1_new2 = cluster.get_region(b"k1"); + let _r1_new2 = cluster.get_region(b"k1"); let r3_new2 = cluster.get_region(b"k3"); iter_ffi_helpers(&cluster, None, &mut |id: u64, _, ffi: &mut FFIHelperSet| { @@ -1718,7 +1716,7 @@ mod snapshot { // Disable default max peer count check. pd_client.disable_default_operator(); - let r1 = cluster.run_conf_change(); + let _ = cluster.run_conf_change(); cluster.must_put(b"k1", b"v1"); cluster.must_put(b"k3", b"v3"); diff --git a/tests/proxy/proxy.rs b/tests/proxy/proxy.rs index 8b9c4cb7f2a..d9ac1290016 100644 --- a/tests/proxy/proxy.rs +++ b/tests/proxy/proxy.rs @@ -14,9 +14,8 @@ use std::{ use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; // use engine_store_ffi::config::{ensure_no_common_unrecognized_keys, ProxyConfig}; use engine_traits::{ - Error, ExternalSstFileInfo, Iterable, Iterator, MiscExt, Mutable, Peekable, Result, SeekKey, - SstExt, SstReader, SstWriter, SstWriterBuilder, WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, - CF_RAFT, CF_WRITE, + Error, Iterable, Iterator, MiscExt, Mutable, Peekable, Result, SeekKey, SstExt, SstWriter, + WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, }; use kvproto::{ raft_cmdpb::{AdminCmdType, AdminRequest}, @@ -30,7 +29,6 @@ use new_mock_engine_store::{ }, Cluster, ProxyConfig, Simulator, TestPdClient, }; -use pd_client::PdClient; use proxy_server::config::{ensure_no_common_unrecognized_keys, validate_and_persist_config}; use raft::eraftpb::MessageType; use raftstore::{ @@ -41,9 +39,8 @@ use sst_importer::SstImporter; pub use test_raftstore::{must_get_equal, must_get_none, new_peer}; use tikv::config::TiKvConfig; use tikv_util::{ - config::{LogFormat, ReadableDuration, ReadableSize}, + config::{ReadableDuration, ReadableSize}, time::Duration, - HandyRwLock, }; // TODO Need refactor if moved to raft-engine @@ -301,7 +298,7 @@ pub fn disable_auto_gen_compact_log(cluster: &mut Cluster) { #[test] fn test_kv_write() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); cluster.cfg.proxy_compat = false; // No persist will be triggered by CompactLog From 715c1fd8fc005df5d67aedfac71e5d39bd7f3274 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 15 Sep 2022 10:58:59 +0800 Subject: [PATCH 2/2] raftstore: Implement coprocessor observer pre_persist (#12957) ref tikv/tikv#12849 Support coprocessor observer pre_commit Signed-off-by: CalvinNeo --- ci_check.sh | 1 + components/engine_store_ffi/src/observer.rs | 47 ++++++- components/proxy_server/src/config.rs | 6 +- components/proxy_server/src/run.rs | 2 +- components/proxy_server/src/setup.rs | 4 +- .../raftstore/src/coprocessor/dispatcher.rs | 34 ++++- components/raftstore/src/coprocessor/mod.rs | 49 ++++--- components/raftstore/src/store/fsm/apply.rs | 79 ++++++++--- components/test_raftstore/src/node.rs | 1 + components/test_raftstore/src/server.rs | 1 + new-mock-engine-store/src/lib.rs | 9 +- new-mock-engine-store/src/mock_cluster.rs | 23 +-- tests/proxy/normal.rs | 133 +++++++++++++++--- tests/proxy/proxy.rs | 37 ++--- 14 files changed, 290 insertions(+), 136 deletions(-) diff --git a/ci_check.sh b/ci_check.sh index ced40cb2e51..742a374447f 100755 --- a/ci_check.sh +++ b/ci_check.sh @@ -31,6 +31,7 @@ elif [[ $M == "testnew" ]]; then cargo test --package tests --test proxy normal::ingest cargo test --package tests --test proxy normal::snapshot cargo test --package tests --test proxy normal::restart + cargo test --package tests --test proxy normal::persist # tests based on new-mock-engine-store, for some tests not available for new proxy cargo test --package tests --test proxy proxy elif [[ $M == "debug" ]]; then diff --git a/components/engine_store_ffi/src/observer.rs b/components/engine_store_ffi/src/observer.rs index 4769208136e..f9736f6c32c 100644 --- a/components/engine_store_ffi/src/observer.rs +++ b/components/engine_store_ffi/src/observer.rs @@ -9,17 +9,13 @@ use std::{ use collections::HashMap; use engine_tiflash::FsStatsExt; -use engine_traits::{CfName, SstMetaInfo}; +use engine_traits::SstMetaInfo; use kvproto::{ - import_sstpb::SstMeta, metapb::Region, - raft_cmdpb::{ - AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest, - RaftCmdRequest, RaftCmdResponse, Request, - }, + raft_cmdpb::{AdminCmdType, AdminRequest, AdminResponse, CmdType, RaftCmdRequest}, raft_serverpb::RaftApplyState, }; -use raft::{eraftpb, StateRole}; +use raft::StateRole; use raftstore::{ coprocessor, coprocessor::{ @@ -583,6 +579,43 @@ impl RegionChangeObserver for TiFlashObserver { .handle_destroy(ob_ctx.region().get_id()); } } + fn pre_persist( + &self, + ob_ctx: &mut ObserverContext<'_>, + is_finished: bool, + cmd: Option<&RaftCmdRequest>, + ) -> bool { + let should_persist = if is_finished { + fail::fail_point!("on_pre_persist_with_finish", |_| { true }); + false + } else { + let cmd = cmd.unwrap(); + if cmd.has_admin_request() { + match cmd.get_admin_request().get_cmd_type() { + // Merge needs to get the latest apply index. + AdminCmdType::CommitMerge | AdminCmdType::RollbackMerge => true, + _ => false, + } + } else { + false + } + }; + if should_persist { + info!( + "observe pre_persist, persist"; + "region_id" => ob_ctx.region().get_id(), + "peer_id" => self.peer_id, + ); + } else { + debug!( + "observe pre_persist"; + "region_id" => ob_ctx.region().get_id(), + "peer_id" => self.peer_id, + "is_finished" => is_finished, + ); + }; + should_persist + } } impl PdTaskObserver for TiFlashObserver { diff --git a/components/proxy_server/src/config.rs b/components/proxy_server/src/config.rs index e2d6bd0e4e5..630e886a8e8 100644 --- a/components/proxy_server/src/config.rs +++ b/components/proxy_server/src/config.rs @@ -1,10 +1,6 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{ - collections::{hash_map::RandomState, HashSet}, - iter::FromIterator, - path::Path, -}; +use std::{collections::HashSet, iter::FromIterator, path::Path}; use itertools::Itertools; use online_config::OnlineConfig; diff --git a/components/proxy_server/src/run.rs b/components/proxy_server/src/run.rs index 5d2c5557412..03169a5b19d 100644 --- a/components/proxy_server/src/run.rs +++ b/components/proxy_server/src/run.rs @@ -8,7 +8,7 @@ use std::{ path::{Path, PathBuf}, str::FromStr, sync::{ - atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering}, + atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering}, mpsc, Arc, Mutex, }, thread, diff --git a/components/proxy_server/src/setup.rs b/components/proxy_server/src/setup.rs index 0254f9e4501..96e94bbc0a9 100644 --- a/components/proxy_server/src/setup.rs +++ b/components/proxy_server/src/setup.rs @@ -6,9 +6,9 @@ use clap::ArgMatches; use collections::HashMap; pub use server::setup::initial_logger; use tikv::config::{MetricConfig, TiKvConfig}; -use tikv_util::{self, config, logger}; +use tikv_util::{self, logger}; -use crate::config::{validate_and_persist_config, ProxyConfig}; +use crate::config::ProxyConfig; pub use crate::fatal; #[allow(dead_code)] diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 9d09c314cf0..ea166431313 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -457,10 +457,11 @@ impl CoprocessorHost { } } - /// `post_exec` should be called immediately after we executed one raft command. - /// It notifies observers side effects of this command before execution of the next command, - /// including req/resp, apply state, modified region state, etc. - /// Return true observers think a persistence is necessary. + /// `post_exec` should be called immediately after we executed one raft + /// command. It notifies observers side effects of this command before + /// execution of the next command, including req/resp, apply state, + /// modified region state, etc. Return true observers think a + /// persistence is necessary. pub fn post_exec( &self, region: &Region, @@ -633,6 +634,26 @@ impl CoprocessorHost { ); } + /// `pre_persist` is called we we want to persist data or meta for a region. + /// For example, in `finish_for` and `commit`, + /// we will separately call `pre_persist` with is_finished = true/false. + /// By returning false, we reject this persistence. + pub fn pre_persist( + &self, + region: &Region, + is_finished: bool, + cmd: Option<&RaftCmdRequest>, + ) -> bool { + let mut ctx = ObserverContext::new(region); + for observer in &self.registry.region_change_observers { + let observer = observer.observer.inner(); + if !observer.pre_persist(&mut ctx, is_finished, cmd) { + return false; + } + } + true + } + pub fn on_flush_applied_cmd_batch( &self, max_level: ObserveLevel, @@ -927,11 +948,10 @@ mod tests { _: u64, _: &crate::store::SnapKey, _: Option<&Snapshot>, - ) -> Result<()> { + ) { self.called .fetch_add(ObserverIndex::PostApplySnapshot as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); - Ok(()) } fn should_pre_apply_snapshot(&self) -> bool { @@ -1100,7 +1120,7 @@ mod tests { index += ObserverIndex::PreApplySnapshot as usize; assert_all!([&ob.called], &[index]); - let _ = host.post_apply_snapshot(®ion, 0, &key, None); + host.post_apply_snapshot(®ion, 0, &key, None); index += ObserverIndex::PostApplySnapshot as usize; assert_all!([&ob.called], &[index]); diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 77112b21d19..ef61ff967a8 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -21,7 +21,7 @@ use raft::{eraftpb, StateRole}; pub mod config; mod consistency_check; pub mod dispatcher; -pub mod error; +mod error; mod metrics; pub mod region_info_accessor; mod split_check; @@ -103,6 +103,18 @@ pub trait AdminObserver: Coprocessor { /// For now, the `region` in `ObserverContext` is an empty region. fn post_apply_admin(&self, _: &mut ObserverContext<'_>, _: &AdminResponse) {} + /// Hook before exec admin request, returns whether we should skip this + /// admin. + fn pre_exec_admin( + &self, + _: &mut ObserverContext<'_>, + _: &AdminRequest, + _: u64, + _: u64, + ) -> bool { + false + } + /// Hook to call immediately after exec command /// Will be a special persistence after this exec if a observer returns true. fn post_exec_admin( @@ -115,18 +127,6 @@ pub trait AdminObserver: Coprocessor { ) -> bool { false } - - /// Hook before exec admin request, returns whether we should skip this - /// admin. - fn pre_exec_admin( - &self, - _: &mut ObserverContext<'_>, - _: &AdminRequest, - _: u64, - _: u64, - ) -> bool { - false - } } pub trait QueryObserver: Coprocessor { @@ -147,6 +147,12 @@ pub trait QueryObserver: Coprocessor { /// For now, the `region` in `ObserverContext` is an empty region. fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &Cmd) {} + /// Hook before exec write request, returns whether we should skip this + /// write. + fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool { + false + } + /// Hook to call immediately after exec command. /// Will be a special persistence after this exec if a observer returns true. fn post_exec_query( @@ -159,12 +165,6 @@ pub trait QueryObserver: Coprocessor { ) -> bool { false } - - /// Hook before exec write request, returns whether we should skip this - /// write. - fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool { - false - } } pub trait ApplySnapshotObserver: Coprocessor { @@ -307,6 +307,17 @@ pub enum RegionChangeEvent { pub trait RegionChangeObserver: Coprocessor { /// Hook to call when a region changed on this TiKV fn on_region_changed(&self, _: &mut ObserverContext<'_>, _: RegionChangeEvent, _: StateRole) {} + + /// Should be called everytime before we write a WriteBatch into + /// KvEngine. Returns false if we can't commit at this time. + fn pre_persist( + &self, + _: &mut ObserverContext<'_>, + _is_finished: bool, + _cmd: Option<&RaftCmdRequest>, + ) -> bool { + true + } } #[derive(Clone, Debug, Default)] diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index a03b1a48af7..c180a4b84fb 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -594,15 +594,18 @@ where delegate: &mut ApplyDelegate, results: VecDeque>, ) { - #[cfg(any(test, feature = "testexport"))] - { - if cfg!(feature = "compat_old_proxy") { - if !delegate.pending_remove { - delegate.write_apply_state(self.kv_wb_mut()); - } + if self.host.pre_persist(&delegate.region, true, None) { + if !delegate.pending_remove { + delegate.write_apply_state(self.kv_wb_mut()); } + self.commit_opt(delegate, false); + } else { + debug!("do not persist when finish_for"; + "region" => ?delegate.region, + "tag" => &delegate.tag, + "apply_state" => ?delegate.apply_state, + ); } - self.commit_opt(delegate, false); self.apply_res.push(ApplyRes { region_id: delegate.region_id(), apply_state: delegate.apply_state.clone(), @@ -717,17 +720,6 @@ pub fn notify_stale_req_with_msg(term: u64, msg: String, cb: Callback bool { - if cmd.has_admin_request() { - match cmd.get_admin_request().get_cmd_type() { - // Merge needs to get the latest apply index. - AdminCmdType::CommitMerge | AdminCmdType::RollbackMerge => return true, - _ => {} - } - } - return false; -} - /// Checks if a write is needed to be issued before handling the command. fn should_write_to_engine(cmd: &RaftCmdRequest) -> bool { if cmd.has_admin_request() { @@ -1096,8 +1088,15 @@ where return ApplyResult::Yield; } } - if should_flush_to_engine(&cmd) { - apply_ctx.commit_opt(self, true); + let mut has_unflushed_data = + self.last_flush_applied_index != self.apply_state.get_applied_index(); + if (has_unflushed_data && should_write_to_engine(&cmd) + || apply_ctx.kv_wb().should_write_to_engine()) + && apply_ctx.host.pre_persist(&self.region, false, Some(&cmd)) + { + // TODO(tiflash) may write apply state twice here. + // Originally use only `commit_opt`. + apply_ctx.commit(self); if let Some(start) = self.handle_start.as_ref() { if start.saturating_elapsed() >= apply_ctx.yield_duration { return ApplyResult::Yield; @@ -4983,6 +4982,7 @@ mod tests { cmd_sink: Option>>>, filter_compact_log: Arc, filter_consistency_check: Arc, + skip_persist_when_pre_commit: Arc, delay_remove_ssts: Arc, last_delete_sst_count: Arc, last_pending_delete_sst_count: Arc, @@ -5106,6 +5106,17 @@ mod tests { fn on_applied_current_term(&self, _: raft::StateRole, _: &Region) {} } + impl RegionChangeObserver for ApplyObserver { + fn pre_persist( + &self, + _: &mut ObserverContext<'_>, + _is_finished: bool, + _cmd: Option<&RaftCmdRequest>, + ) -> bool { + !self.skip_persist_when_pre_commit.load(Ordering::SeqCst) + } + } + #[test] fn test_handle_raft_committed_entries() { let (_path, engine) = create_tmp_engine("test-delegate"); @@ -5725,6 +5736,8 @@ mod tests { let obs = ApplyObserver::default(); host.registry .register_admin_observer(1, BoxAdminObserver::new(obs.clone())); + host.registry + .register_region_change_observer(1, BoxRegionChangeObserver::new(obs.clone())); host.registry .register_query_observer(1, BoxQueryObserver::new(obs.clone())); @@ -5760,6 +5773,8 @@ mod tests { reg.region.mut_region_epoch().set_version(3); router.schedule_task(1, Msg::Registration(reg)); + obs.skip_persist_when_pre_commit + .store(true, Ordering::SeqCst); let mut index_id = 1; let put_entry = EntryBuilder::new(index_id, 1) .put(b"k1", b"v1") @@ -5768,7 +5783,19 @@ mod tests { .epoch(1, 3) .build(); router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![put_entry], vec![]))); - fetch_apply_res(&rx); + let apply_res = fetch_apply_res(&rx); + + // We don't persist at `finish_for`, since we disabled `pre_persist`. + let state: RaftApplyState = engine + .get_msg_cf(CF_RAFT, &keys::apply_state_key(1)) + .unwrap() + .unwrap_or_default(); + assert_eq!( + apply_res.apply_state.get_applied_index(), + state.get_applied_index() + 1 + ); + obs.skip_persist_when_pre_commit + .store(false, Ordering::SeqCst); // Phase 1: we test if pre_exec will filter execution of commands correctly. index_id += 1; @@ -5790,6 +5817,16 @@ mod tests { assert_eq!(apply_res.exec_res.len(), 0); assert_eq!(apply_res.apply_state.get_truncated_state().get_index(), 0); + // We persist at `finish_for`, since we enabled `pre_persist`. + let state: RaftApplyState = engine + .get_msg_cf(CF_RAFT, &keys::apply_state_key(1)) + .unwrap() + .unwrap_or_default(); + assert_eq!( + apply_res.apply_state.get_applied_index(), + state.get_applied_index() + ); + index_id += 1; // Don't filter CompactLog obs.filter_compact_log.store(false, Ordering::SeqCst); diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index db87f850a09..3c973b92f66 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -262,6 +262,7 @@ impl Simulator for NodeCluster { .max_total_size(cfg.server.snap_max_total_size.0) .encryption_key_manager(key_manager) .max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0) + .enable_multi_snapshot_files(true) .build(tmp.path().to_str().unwrap()); (snap_mgr, Some(tmp)) } else { diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index c059f6937eb..cc75cb27c8e 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -429,6 +429,7 @@ impl ServerCluster { .max_total_size(cfg.server.snap_max_total_size.0) .encryption_key_manager(key_manager) .max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0) + .enable_multi_snapshot_files(true) .build(tmp_str); self.snap_mgrs.insert(node_id, snap_mgr.clone()); let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index 4205d44e744..ac718ff6ec0 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -9,7 +9,6 @@ use std::{ time::Duration, }; -use engine_rocks::RocksEngine; pub use engine_store_ffi::{ interfaces::root::DB as ffi_interfaces, EngineStoreServerHelper, RaftStoreProxyFFIHelper, RawCppPtr, UnwrapExternCFunc, @@ -18,10 +17,8 @@ use engine_traits::{ Engines, Iterable, Peekable, SyncMutable, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, }; use kvproto::{ - raft_cmdpb::{AdminCmdType, AdminRequest}, - raft_serverpb::{ - MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData, RegionLocalState, - }, + raft_cmdpb::AdminCmdType, + raft_serverpb::{RaftApplyState, RegionLocalState}, }; pub use mock_cluster::{Cluster, ProxyConfig, Simulator, TestPdClient, TiFlashEngine}; use protobuf::Message; @@ -840,7 +837,7 @@ impl ProxyNotifier { { let lock = self.mutex.lock().unwrap(); if !self.flag.load(std::sync::atomic::Ordering::Acquire) { - self.cv.wait_timeout(lock, timeout); + let _ = self.cv.wait_timeout(lock, timeout); } } self.flag.store(false, std::sync::atomic::Ordering::Release); diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 8af738c1527..ba491d4333b 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -1,13 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -#![feature(slice_take)] - use std::{ - borrow::BorrowMut, - cell::RefCell, - collections::{hash_map::Entry as MapEntry, BTreeMap}, - path::Path, - pin::Pin, + collections::hash_map::Entry as MapEntry, result, sync::{atomic::AtomicU8, Arc, Mutex, RwLock}, thread, @@ -22,19 +16,17 @@ pub use engine_store_ffi::{ interfaces::root::DB as ffi_interfaces, EngineStoreServerHelper, RaftStoreProxyFFIHelper, RawCppPtr, TiFlashEngine, UnwrapExternCFunc, }; -use engine_traits::{Engines, KvEngine, SyncMutable, CF_DEFAULT, CF_LOCK, CF_WRITE}; +use engine_traits::{Engines, KvEngine, CF_DEFAULT}; use file_system::IORateLimiter; use futures::executor::block_on; use kvproto::{ errorpb::Error as PbError, - metapb::{self, Buckets, PeerRole, RegionEpoch, StoreLabel}, + metapb::{self, PeerRole, RegionEpoch, StoreLabel}, raft_cmdpb::{RaftCmdRequest, RaftCmdResponse, Request, *}, raft_serverpb::RaftMessage, }; -use lazy_static::lazy_static; use pd_client::PdClient; pub use proxy_server::config::ProxyConfig; -use proxy_server::fatal; use raftstore::{ store::{ bootstrap_store, @@ -58,12 +50,9 @@ pub use test_raftstore::{ new_region_leader_cmd, new_request, new_status_request, new_store, new_tikv_config, new_transfer_leader_cmd, sleep_ms, TestPdClient, }; -use tikv::{ - config::TiKvConfig, - server::{Node, Result as ServerResult}, -}; +use tikv::{config::TiKvConfig, server::Result as ServerResult}; use tikv_util::{ - crit, debug, error, info, safe_panic, + debug, error, safe_panic, sys::SysQuota, thread_group::GroupProperties, time::{Instant, ThreadReadId}, @@ -259,7 +248,7 @@ impl> Cluster { key_manager: &Option>, router: &Option>, ) { - let (mut ffi_helper_set, node_cfg) = + let (mut ffi_helper_set, _node_cfg) = self.make_ffi_helper_set(0, engines, key_manager, router); // We can not use moved or cloned engines any more. diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 14096b6c801..9893b9abaf9 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -1,28 +1,20 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use std::{ - collections::HashMap, - io::{self, Read, Write}, - ops::{Deref, DerefMut}, + io::Write, + ops::DerefMut, path::{Path, PathBuf}, str::FromStr, - sync::{ - atomic::{AtomicUsize, Ordering}, - mpsc, Arc, Once, RwLock, - }, + sync::{atomic::Ordering, mpsc, Arc}, }; -use clap::{App, Arg, ArgMatches}; +use clap::{App, Arg}; use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; -use engine_traits::{ - Error, ExternalSstFileInfo, Iterable, Iterator, MiscExt, Mutable, Peekable, Result, SeekKey, - SstExt, WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, -}; +use engine_traits::MiscExt; use kvproto::{ import_sstpb::SstMeta, metapb::RegionEpoch, - raft_cmdpb::{AdminCmdType, AdminRequest, CmdType, Request}, - raft_serverpb::{RaftApplyState, RegionLocalState, StoreIdent}, + raft_cmdpb::{CmdType, Request}, }; use new_mock_engine_store::{ config::Config, @@ -37,20 +29,18 @@ use pd_client::PdClient; use proxy_server::{ config::{ address_proxy_config, ensure_no_common_unrecognized_keys, get_last_config, - make_tikv_config, setup_default_tikv_config, validate_and_persist_config, - TIFLASH_DEFAULT_LISTENING_ADDR, TIFLASH_DEFAULT_STATUS_ADDR, + setup_default_tikv_config, validate_and_persist_config, TIFLASH_DEFAULT_LISTENING_ADDR, + TIFLASH_DEFAULT_STATUS_ADDR, }, proxy::gen_tikv_config, - run::run_tikv_proxy, }; use raft::eraftpb::MessageType; -use raftstore::{coprocessor::ConsistencyCheckMethod, store::util::find_peer}; +use raftstore::store::util::find_peer; use sst_importer::SstImporter; pub use test_raftstore::{must_get_equal, must_get_none, new_peer}; -use test_raftstore::{new_node_cluster, new_tikv_config}; use tikv::config::{TiKvConfig, LAST_CONFIG_FILE}; use tikv_util::{ - config::{LogFormat, ReadableDuration, ReadableSize}, + config::{ReadableDuration, ReadableSize}, time::Duration, HandyRwLock, }; @@ -949,9 +939,7 @@ mod write { } mod ingest { - use tempfile::TempDir; use test_sst_importer::gen_sst_file_with_kvs; - use txn_types::TimeStamp; use super::*; @@ -1782,3 +1770,104 @@ mod snapshot { cluster.shutdown(); } } + +mod persist { + use super::*; + + #[test] + fn test_persist_when_finish() { + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); + disable_auto_gen_compact_log(&mut cluster); + + cluster.run(); + cluster.must_put(b"k0", b"v0"); + check_key(&cluster, b"k0", b"v0", Some(true), Some(false), None); + let region_id = cluster.get_region(b"k0").get_id(); + + let prev_states = collect_all_states(&cluster, region_id); + cluster.must_put(b"k1", b"v1"); + check_key(&cluster, b"k1", b"v1", Some(true), Some(false), None); + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_eq!( + old.in_memory_apply_state.get_applied_index() + 1, + new.in_memory_apply_state.get_applied_index() + ); + assert_eq!( + old.in_disk_apply_state.get_applied_index(), + new.in_disk_apply_state.get_applied_index() + ); + } + + fail::cfg("on_pre_persist_with_finish", "return").unwrap(); + cluster.must_put(b"k2", b"v2"); + // Because we flush when batch ends. + check_key(&cluster, b"k2", b"v2", Some(true), Some(false), None); + + // TODO(tiflash) wait `write_apply_state` in raftstore. + std::thread::sleep(std::time::Duration::from_millis(1000)); + let prev_states = collect_all_states(&cluster, region_id); + cluster.must_put(b"k3", b"v3"); + // Because we flush when batch ends. + check_key(&cluster, b"k3", b"v3", Some(true), Some(false), None); + + // TODO(tiflash) wait `write_apply_state` in raftstore. + std::thread::sleep(std::time::Duration::from_millis(1000)); + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + let gap = new.in_memory_apply_state.get_applied_index() + - old.in_memory_apply_state.get_applied_index(); + let gap2 = new.in_disk_apply_state.get_applied_index() + - old.in_disk_apply_state.get_applied_index(); + assert_eq!(gap, gap2); + } + fail::remove("on_pre_persist_with_finish"); + } + + #[test] + fn test_persist_when_merge() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // disable_auto_gen_compact_log(&mut cluster); + cluster.cfg.raft_store.right_derive_when_split = false; + + cluster.run(); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + + check_key(&cluster, b"k1", b"v1", Some(true), None, None); + check_key(&cluster, b"k3", b"v3", Some(true), None, None); + + let r1 = cluster.get_region(b"k1"); + cluster.must_split(&r1, b"k2"); + let r3 = cluster.get_region(b"k3"); + + std::thread::sleep(std::time::Duration::from_millis(1000)); + let prev_states = collect_all_states(&cluster, r3.get_id()); + + info!("start merge"; "from" => r1.get_id(), "to" => r3.get_id()); + pd_client.must_merge(r1.get_id(), r3.get_id()); + + // TODO(tiflash) wait `write_apply_state` in raftstore. + std::thread::sleep(std::time::Duration::from_millis(1000)); + let r3_new = cluster.get_region(b"k3"); + assert_eq!(r3_new.get_id(), r3.get_id()); + let new_states = collect_all_states(&cluster, r3_new.get_id()); + // index 6 empty command + // index 7 CommitMerge + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + let _gap = new.in_memory_apply_state.get_applied_index() + - old.in_memory_apply_state.get_applied_index(); + let gap2 = new.in_disk_apply_state.get_applied_index() + - old.in_disk_apply_state.get_applied_index(); + assert_eq!(gap2, 2); + } + } +} diff --git a/tests/proxy/proxy.rs b/tests/proxy/proxy.rs index d9ac1290016..709de704319 100644 --- a/tests/proxy/proxy.rs +++ b/tests/proxy/proxy.rs @@ -2,42 +2,20 @@ use std::{ collections::HashMap, - io::{self, Read, Write}, - ops::{Deref, DerefMut}, - path::Path, - sync::{ - atomic::{AtomicUsize, Ordering}, - mpsc, Arc, Once, RwLock, - }, + sync::{Arc, RwLock}, }; -use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; // use engine_store_ffi::config::{ensure_no_common_unrecognized_keys, ProxyConfig}; -use engine_traits::{ - Error, Iterable, Iterator, MiscExt, Mutable, Peekable, Result, SeekKey, SstExt, SstWriter, - WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, -}; +use engine_traits::{Peekable, CF_RAFT}; use kvproto::{ raft_cmdpb::{AdminCmdType, AdminRequest}, raft_serverpb::{RaftApplyState, RegionLocalState, StoreIdent}, }; use new_mock_engine_store::{ - mock_cluster::FFIHelperSet, - node::NodeCluster, - transport_simulate::{ - CloneFilterFactory, CollectSnapshotFilter, Direction, RegionPacketFilter, - }, - Cluster, ProxyConfig, Simulator, TestPdClient, -}; -use proxy_server::config::{ensure_no_common_unrecognized_keys, validate_and_persist_config}; -use raft::eraftpb::MessageType; -use raftstore::{ - coprocessor::{ConsistencyCheckMethod, Coprocessor}, - store::util::find_peer, + mock_cluster::FFIHelperSet, node::NodeCluster, Cluster, ProxyConfig, TestPdClient, }; -use sst_importer::SstImporter; +use raftstore::coprocessor::ConsistencyCheckMethod; pub use test_raftstore::{must_get_equal, must_get_none, new_peer}; -use tikv::config::TiKvConfig; use tikv_util::{ config::{ReadableDuration, ReadableSize}, time::Duration, @@ -82,6 +60,7 @@ pub fn new_verify_hash_request(hash: Vec, index: u64) -> AdminRequest { req } +#[derive(Debug)] pub struct States { pub in_memory_apply_state: RaftApplyState, pub in_memory_applied_term: u64, @@ -291,9 +270,9 @@ pub fn disable_auto_gen_compact_log(cluster: &mut Cluster) { // Disable AUTO generated compact log. // This will not totally disable, so we use some failpoints later. cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); - cluster.cfg.raft_store.raft_log_gc_threshold = 1000; + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(100000); + cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(500000); + cluster.cfg.raft_store.raft_log_gc_threshold = 10000; } #[test]