From 255b5df0895aec007b29593c5bb36681eeb099d3 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 15 Sep 2022 10:58:59 +0800 Subject: [PATCH] raftstore: Implement coprocessor observer pre_persist (#12957) ref tikv/tikv#12849 Support coprocessor observer pre_commit Signed-off-by: CalvinNeo --- ci_check.sh | 1 + components/engine_store_ffi/src/observer.rs | 47 ++++++- .../raftstore/src/coprocessor/dispatcher.rs | 34 ++++- components/raftstore/src/coprocessor/mod.rs | 49 ++++--- components/raftstore/src/store/fsm/apply.rs | 79 ++++++++--- components/test_raftstore/src/node.rs | 1 + components/test_raftstore/src/server.rs | 1 + new-mock-engine-store/src/lib.rs | 7 +- new-mock-engine-store/src/mock_cluster.rs | 19 +-- tests/proxy/normal.rs | 133 +++++++++++++++--- tests/proxy/proxy.rs | 37 ++--- 11 files changed, 284 insertions(+), 124 deletions(-) diff --git a/ci_check.sh b/ci_check.sh index ced40cb2e517..742a374447f3 100755 --- a/ci_check.sh +++ b/ci_check.sh @@ -31,6 +31,7 @@ elif [[ $M == "testnew" ]]; then cargo test --package tests --test proxy normal::ingest cargo test --package tests --test proxy normal::snapshot cargo test --package tests --test proxy normal::restart + cargo test --package tests --test proxy normal::persist # tests based on new-mock-engine-store, for some tests not available for new proxy cargo test --package tests --test proxy proxy elif [[ $M == "debug" ]]; then diff --git a/components/engine_store_ffi/src/observer.rs b/components/engine_store_ffi/src/observer.rs index 4769208136e0..f9736f6c32cb 100644 --- a/components/engine_store_ffi/src/observer.rs +++ b/components/engine_store_ffi/src/observer.rs @@ -9,17 +9,13 @@ use std::{ use collections::HashMap; use engine_tiflash::FsStatsExt; -use engine_traits::{CfName, SstMetaInfo}; +use engine_traits::SstMetaInfo; use kvproto::{ - import_sstpb::SstMeta, metapb::Region, - raft_cmdpb::{ - AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest, - RaftCmdRequest, RaftCmdResponse, Request, - }, + raft_cmdpb::{AdminCmdType, AdminRequest, AdminResponse, CmdType, RaftCmdRequest}, raft_serverpb::RaftApplyState, }; -use raft::{eraftpb, StateRole}; +use raft::StateRole; use raftstore::{ coprocessor, coprocessor::{ @@ -583,6 +579,43 @@ impl RegionChangeObserver for TiFlashObserver { .handle_destroy(ob_ctx.region().get_id()); } } + fn pre_persist( + &self, + ob_ctx: &mut ObserverContext<'_>, + is_finished: bool, + cmd: Option<&RaftCmdRequest>, + ) -> bool { + let should_persist = if is_finished { + fail::fail_point!("on_pre_persist_with_finish", |_| { true }); + false + } else { + let cmd = cmd.unwrap(); + if cmd.has_admin_request() { + match cmd.get_admin_request().get_cmd_type() { + // Merge needs to get the latest apply index. + AdminCmdType::CommitMerge | AdminCmdType::RollbackMerge => true, + _ => false, + } + } else { + false + } + }; + if should_persist { + info!( + "observe pre_persist, persist"; + "region_id" => ob_ctx.region().get_id(), + "peer_id" => self.peer_id, + ); + } else { + debug!( + "observe pre_persist"; + "region_id" => ob_ctx.region().get_id(), + "peer_id" => self.peer_id, + "is_finished" => is_finished, + ); + }; + should_persist + } } impl PdTaskObserver for TiFlashObserver { diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 9d09c314cf0e..ea166431313d 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -457,10 +457,11 @@ impl CoprocessorHost { } } - /// `post_exec` should be called immediately after we executed one raft command. - /// It notifies observers side effects of this command before execution of the next command, - /// including req/resp, apply state, modified region state, etc. - /// Return true observers think a persistence is necessary. + /// `post_exec` should be called immediately after we executed one raft + /// command. It notifies observers side effects of this command before + /// execution of the next command, including req/resp, apply state, + /// modified region state, etc. Return true observers think a + /// persistence is necessary. pub fn post_exec( &self, region: &Region, @@ -633,6 +634,26 @@ impl CoprocessorHost { ); } + /// `pre_persist` is called we we want to persist data or meta for a region. + /// For example, in `finish_for` and `commit`, + /// we will separately call `pre_persist` with is_finished = true/false. + /// By returning false, we reject this persistence. + pub fn pre_persist( + &self, + region: &Region, + is_finished: bool, + cmd: Option<&RaftCmdRequest>, + ) -> bool { + let mut ctx = ObserverContext::new(region); + for observer in &self.registry.region_change_observers { + let observer = observer.observer.inner(); + if !observer.pre_persist(&mut ctx, is_finished, cmd) { + return false; + } + } + true + } + pub fn on_flush_applied_cmd_batch( &self, max_level: ObserveLevel, @@ -927,11 +948,10 @@ mod tests { _: u64, _: &crate::store::SnapKey, _: Option<&Snapshot>, - ) -> Result<()> { + ) { self.called .fetch_add(ObserverIndex::PostApplySnapshot as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); - Ok(()) } fn should_pre_apply_snapshot(&self) -> bool { @@ -1100,7 +1120,7 @@ mod tests { index += ObserverIndex::PreApplySnapshot as usize; assert_all!([&ob.called], &[index]); - let _ = host.post_apply_snapshot(®ion, 0, &key, None); + host.post_apply_snapshot(®ion, 0, &key, None); index += ObserverIndex::PostApplySnapshot as usize; assert_all!([&ob.called], &[index]); diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 77112b21d19d..ef61ff967a89 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -21,7 +21,7 @@ use raft::{eraftpb, StateRole}; pub mod config; mod consistency_check; pub mod dispatcher; -pub mod error; +mod error; mod metrics; pub mod region_info_accessor; mod split_check; @@ -103,6 +103,18 @@ pub trait AdminObserver: Coprocessor { /// For now, the `region` in `ObserverContext` is an empty region. fn post_apply_admin(&self, _: &mut ObserverContext<'_>, _: &AdminResponse) {} + /// Hook before exec admin request, returns whether we should skip this + /// admin. + fn pre_exec_admin( + &self, + _: &mut ObserverContext<'_>, + _: &AdminRequest, + _: u64, + _: u64, + ) -> bool { + false + } + /// Hook to call immediately after exec command /// Will be a special persistence after this exec if a observer returns true. fn post_exec_admin( @@ -115,18 +127,6 @@ pub trait AdminObserver: Coprocessor { ) -> bool { false } - - /// Hook before exec admin request, returns whether we should skip this - /// admin. - fn pre_exec_admin( - &self, - _: &mut ObserverContext<'_>, - _: &AdminRequest, - _: u64, - _: u64, - ) -> bool { - false - } } pub trait QueryObserver: Coprocessor { @@ -147,6 +147,12 @@ pub trait QueryObserver: Coprocessor { /// For now, the `region` in `ObserverContext` is an empty region. fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &Cmd) {} + /// Hook before exec write request, returns whether we should skip this + /// write. + fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool { + false + } + /// Hook to call immediately after exec command. /// Will be a special persistence after this exec if a observer returns true. fn post_exec_query( @@ -159,12 +165,6 @@ pub trait QueryObserver: Coprocessor { ) -> bool { false } - - /// Hook before exec write request, returns whether we should skip this - /// write. - fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool { - false - } } pub trait ApplySnapshotObserver: Coprocessor { @@ -307,6 +307,17 @@ pub enum RegionChangeEvent { pub trait RegionChangeObserver: Coprocessor { /// Hook to call when a region changed on this TiKV fn on_region_changed(&self, _: &mut ObserverContext<'_>, _: RegionChangeEvent, _: StateRole) {} + + /// Should be called everytime before we write a WriteBatch into + /// KvEngine. Returns false if we can't commit at this time. + fn pre_persist( + &self, + _: &mut ObserverContext<'_>, + _is_finished: bool, + _cmd: Option<&RaftCmdRequest>, + ) -> bool { + true + } } #[derive(Clone, Debug, Default)] diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index a03b1a48af77..c180a4b84fb2 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -594,15 +594,18 @@ where delegate: &mut ApplyDelegate, results: VecDeque>, ) { - #[cfg(any(test, feature = "testexport"))] - { - if cfg!(feature = "compat_old_proxy") { - if !delegate.pending_remove { - delegate.write_apply_state(self.kv_wb_mut()); - } + if self.host.pre_persist(&delegate.region, true, None) { + if !delegate.pending_remove { + delegate.write_apply_state(self.kv_wb_mut()); } + self.commit_opt(delegate, false); + } else { + debug!("do not persist when finish_for"; + "region" => ?delegate.region, + "tag" => &delegate.tag, + "apply_state" => ?delegate.apply_state, + ); } - self.commit_opt(delegate, false); self.apply_res.push(ApplyRes { region_id: delegate.region_id(), apply_state: delegate.apply_state.clone(), @@ -717,17 +720,6 @@ pub fn notify_stale_req_with_msg(term: u64, msg: String, cb: Callback bool { - if cmd.has_admin_request() { - match cmd.get_admin_request().get_cmd_type() { - // Merge needs to get the latest apply index. - AdminCmdType::CommitMerge | AdminCmdType::RollbackMerge => return true, - _ => {} - } - } - return false; -} - /// Checks if a write is needed to be issued before handling the command. fn should_write_to_engine(cmd: &RaftCmdRequest) -> bool { if cmd.has_admin_request() { @@ -1096,8 +1088,15 @@ where return ApplyResult::Yield; } } - if should_flush_to_engine(&cmd) { - apply_ctx.commit_opt(self, true); + let mut has_unflushed_data = + self.last_flush_applied_index != self.apply_state.get_applied_index(); + if (has_unflushed_data && should_write_to_engine(&cmd) + || apply_ctx.kv_wb().should_write_to_engine()) + && apply_ctx.host.pre_persist(&self.region, false, Some(&cmd)) + { + // TODO(tiflash) may write apply state twice here. + // Originally use only `commit_opt`. + apply_ctx.commit(self); if let Some(start) = self.handle_start.as_ref() { if start.saturating_elapsed() >= apply_ctx.yield_duration { return ApplyResult::Yield; @@ -4983,6 +4982,7 @@ mod tests { cmd_sink: Option>>>, filter_compact_log: Arc, filter_consistency_check: Arc, + skip_persist_when_pre_commit: Arc, delay_remove_ssts: Arc, last_delete_sst_count: Arc, last_pending_delete_sst_count: Arc, @@ -5106,6 +5106,17 @@ mod tests { fn on_applied_current_term(&self, _: raft::StateRole, _: &Region) {} } + impl RegionChangeObserver for ApplyObserver { + fn pre_persist( + &self, + _: &mut ObserverContext<'_>, + _is_finished: bool, + _cmd: Option<&RaftCmdRequest>, + ) -> bool { + !self.skip_persist_when_pre_commit.load(Ordering::SeqCst) + } + } + #[test] fn test_handle_raft_committed_entries() { let (_path, engine) = create_tmp_engine("test-delegate"); @@ -5725,6 +5736,8 @@ mod tests { let obs = ApplyObserver::default(); host.registry .register_admin_observer(1, BoxAdminObserver::new(obs.clone())); + host.registry + .register_region_change_observer(1, BoxRegionChangeObserver::new(obs.clone())); host.registry .register_query_observer(1, BoxQueryObserver::new(obs.clone())); @@ -5760,6 +5773,8 @@ mod tests { reg.region.mut_region_epoch().set_version(3); router.schedule_task(1, Msg::Registration(reg)); + obs.skip_persist_when_pre_commit + .store(true, Ordering::SeqCst); let mut index_id = 1; let put_entry = EntryBuilder::new(index_id, 1) .put(b"k1", b"v1") @@ -5768,7 +5783,19 @@ mod tests { .epoch(1, 3) .build(); router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![put_entry], vec![]))); - fetch_apply_res(&rx); + let apply_res = fetch_apply_res(&rx); + + // We don't persist at `finish_for`, since we disabled `pre_persist`. + let state: RaftApplyState = engine + .get_msg_cf(CF_RAFT, &keys::apply_state_key(1)) + .unwrap() + .unwrap_or_default(); + assert_eq!( + apply_res.apply_state.get_applied_index(), + state.get_applied_index() + 1 + ); + obs.skip_persist_when_pre_commit + .store(false, Ordering::SeqCst); // Phase 1: we test if pre_exec will filter execution of commands correctly. index_id += 1; @@ -5790,6 +5817,16 @@ mod tests { assert_eq!(apply_res.exec_res.len(), 0); assert_eq!(apply_res.apply_state.get_truncated_state().get_index(), 0); + // We persist at `finish_for`, since we enabled `pre_persist`. + let state: RaftApplyState = engine + .get_msg_cf(CF_RAFT, &keys::apply_state_key(1)) + .unwrap() + .unwrap_or_default(); + assert_eq!( + apply_res.apply_state.get_applied_index(), + state.get_applied_index() + ); + index_id += 1; // Don't filter CompactLog obs.filter_compact_log.store(false, Ordering::SeqCst); diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index db87f850a09a..3c973b92f667 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -262,6 +262,7 @@ impl Simulator for NodeCluster { .max_total_size(cfg.server.snap_max_total_size.0) .encryption_key_manager(key_manager) .max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0) + .enable_multi_snapshot_files(true) .build(tmp.path().to_str().unwrap()); (snap_mgr, Some(tmp)) } else { diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index c059f6937ebc..cc75cb27c8ed 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -429,6 +429,7 @@ impl ServerCluster { .max_total_size(cfg.server.snap_max_total_size.0) .encryption_key_manager(key_manager) .max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0) + .enable_multi_snapshot_files(true) .build(tmp_str); self.snap_mgrs.insert(node_id, snap_mgr.clone()); let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index 4205d44e7441..2776d715c435 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -9,7 +9,6 @@ use std::{ time::Duration, }; -use engine_rocks::RocksEngine; pub use engine_store_ffi::{ interfaces::root::DB as ffi_interfaces, EngineStoreServerHelper, RaftStoreProxyFFIHelper, RawCppPtr, UnwrapExternCFunc, @@ -18,10 +17,8 @@ use engine_traits::{ Engines, Iterable, Peekable, SyncMutable, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, }; use kvproto::{ - raft_cmdpb::{AdminCmdType, AdminRequest}, - raft_serverpb::{ - MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData, RegionLocalState, - }, + raft_cmdpb::AdminCmdType, + raft_serverpb::{RaftApplyState, RegionLocalState}, }; pub use mock_cluster::{Cluster, ProxyConfig, Simulator, TestPdClient, TiFlashEngine}; use protobuf::Message; diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 8af738c15273..cc84726765b3 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -3,11 +3,7 @@ #![feature(slice_take)] use std::{ - borrow::BorrowMut, - cell::RefCell, - collections::{hash_map::Entry as MapEntry, BTreeMap}, - path::Path, - pin::Pin, + collections::hash_map::Entry as MapEntry, result, sync::{atomic::AtomicU8, Arc, Mutex, RwLock}, thread, @@ -22,19 +18,17 @@ pub use engine_store_ffi::{ interfaces::root::DB as ffi_interfaces, EngineStoreServerHelper, RaftStoreProxyFFIHelper, RawCppPtr, TiFlashEngine, UnwrapExternCFunc, }; -use engine_traits::{Engines, KvEngine, SyncMutable, CF_DEFAULT, CF_LOCK, CF_WRITE}; +use engine_traits::{Engines, KvEngine, CF_DEFAULT}; use file_system::IORateLimiter; use futures::executor::block_on; use kvproto::{ errorpb::Error as PbError, - metapb::{self, Buckets, PeerRole, RegionEpoch, StoreLabel}, + metapb::{self, PeerRole, RegionEpoch, StoreLabel}, raft_cmdpb::{RaftCmdRequest, RaftCmdResponse, Request, *}, raft_serverpb::RaftMessage, }; -use lazy_static::lazy_static; use pd_client::PdClient; pub use proxy_server::config::ProxyConfig; -use proxy_server::fatal; use raftstore::{ store::{ bootstrap_store, @@ -58,10 +52,7 @@ pub use test_raftstore::{ new_region_leader_cmd, new_request, new_status_request, new_store, new_tikv_config, new_transfer_leader_cmd, sleep_ms, TestPdClient, }; -use tikv::{ - config::TiKvConfig, - server::{Node, Result as ServerResult}, -}; +use tikv::{config::TiKvConfig, server::Result as ServerResult}; use tikv_util::{ crit, debug, error, info, safe_panic, sys::SysQuota, @@ -259,7 +250,7 @@ impl> Cluster { key_manager: &Option>, router: &Option>, ) { - let (mut ffi_helper_set, node_cfg) = + let (mut ffi_helper_set, _node_cfg) = self.make_ffi_helper_set(0, engines, key_manager, router); // We can not use moved or cloned engines any more. diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 14096b6c801c..9893b9abaf9e 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -1,28 +1,20 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use std::{ - collections::HashMap, - io::{self, Read, Write}, - ops::{Deref, DerefMut}, + io::Write, + ops::DerefMut, path::{Path, PathBuf}, str::FromStr, - sync::{ - atomic::{AtomicUsize, Ordering}, - mpsc, Arc, Once, RwLock, - }, + sync::{atomic::Ordering, mpsc, Arc}, }; -use clap::{App, Arg, ArgMatches}; +use clap::{App, Arg}; use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; -use engine_traits::{ - Error, ExternalSstFileInfo, Iterable, Iterator, MiscExt, Mutable, Peekable, Result, SeekKey, - SstExt, WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, -}; +use engine_traits::MiscExt; use kvproto::{ import_sstpb::SstMeta, metapb::RegionEpoch, - raft_cmdpb::{AdminCmdType, AdminRequest, CmdType, Request}, - raft_serverpb::{RaftApplyState, RegionLocalState, StoreIdent}, + raft_cmdpb::{CmdType, Request}, }; use new_mock_engine_store::{ config::Config, @@ -37,20 +29,18 @@ use pd_client::PdClient; use proxy_server::{ config::{ address_proxy_config, ensure_no_common_unrecognized_keys, get_last_config, - make_tikv_config, setup_default_tikv_config, validate_and_persist_config, - TIFLASH_DEFAULT_LISTENING_ADDR, TIFLASH_DEFAULT_STATUS_ADDR, + setup_default_tikv_config, validate_and_persist_config, TIFLASH_DEFAULT_LISTENING_ADDR, + TIFLASH_DEFAULT_STATUS_ADDR, }, proxy::gen_tikv_config, - run::run_tikv_proxy, }; use raft::eraftpb::MessageType; -use raftstore::{coprocessor::ConsistencyCheckMethod, store::util::find_peer}; +use raftstore::store::util::find_peer; use sst_importer::SstImporter; pub use test_raftstore::{must_get_equal, must_get_none, new_peer}; -use test_raftstore::{new_node_cluster, new_tikv_config}; use tikv::config::{TiKvConfig, LAST_CONFIG_FILE}; use tikv_util::{ - config::{LogFormat, ReadableDuration, ReadableSize}, + config::{ReadableDuration, ReadableSize}, time::Duration, HandyRwLock, }; @@ -949,9 +939,7 @@ mod write { } mod ingest { - use tempfile::TempDir; use test_sst_importer::gen_sst_file_with_kvs; - use txn_types::TimeStamp; use super::*; @@ -1782,3 +1770,104 @@ mod snapshot { cluster.shutdown(); } } + +mod persist { + use super::*; + + #[test] + fn test_persist_when_finish() { + let (mut cluster, _pd_client) = new_mock_cluster(0, 3); + disable_auto_gen_compact_log(&mut cluster); + + cluster.run(); + cluster.must_put(b"k0", b"v0"); + check_key(&cluster, b"k0", b"v0", Some(true), Some(false), None); + let region_id = cluster.get_region(b"k0").get_id(); + + let prev_states = collect_all_states(&cluster, region_id); + cluster.must_put(b"k1", b"v1"); + check_key(&cluster, b"k1", b"v1", Some(true), Some(false), None); + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_eq!( + old.in_memory_apply_state.get_applied_index() + 1, + new.in_memory_apply_state.get_applied_index() + ); + assert_eq!( + old.in_disk_apply_state.get_applied_index(), + new.in_disk_apply_state.get_applied_index() + ); + } + + fail::cfg("on_pre_persist_with_finish", "return").unwrap(); + cluster.must_put(b"k2", b"v2"); + // Because we flush when batch ends. + check_key(&cluster, b"k2", b"v2", Some(true), Some(false), None); + + // TODO(tiflash) wait `write_apply_state` in raftstore. + std::thread::sleep(std::time::Duration::from_millis(1000)); + let prev_states = collect_all_states(&cluster, region_id); + cluster.must_put(b"k3", b"v3"); + // Because we flush when batch ends. + check_key(&cluster, b"k3", b"v3", Some(true), Some(false), None); + + // TODO(tiflash) wait `write_apply_state` in raftstore. + std::thread::sleep(std::time::Duration::from_millis(1000)); + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + let gap = new.in_memory_apply_state.get_applied_index() + - old.in_memory_apply_state.get_applied_index(); + let gap2 = new.in_disk_apply_state.get_applied_index() + - old.in_disk_apply_state.get_applied_index(); + assert_eq!(gap, gap2); + } + fail::remove("on_pre_persist_with_finish"); + } + + #[test] + fn test_persist_when_merge() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // disable_auto_gen_compact_log(&mut cluster); + cluster.cfg.raft_store.right_derive_when_split = false; + + cluster.run(); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + + check_key(&cluster, b"k1", b"v1", Some(true), None, None); + check_key(&cluster, b"k3", b"v3", Some(true), None, None); + + let r1 = cluster.get_region(b"k1"); + cluster.must_split(&r1, b"k2"); + let r3 = cluster.get_region(b"k3"); + + std::thread::sleep(std::time::Duration::from_millis(1000)); + let prev_states = collect_all_states(&cluster, r3.get_id()); + + info!("start merge"; "from" => r1.get_id(), "to" => r3.get_id()); + pd_client.must_merge(r1.get_id(), r3.get_id()); + + // TODO(tiflash) wait `write_apply_state` in raftstore. + std::thread::sleep(std::time::Duration::from_millis(1000)); + let r3_new = cluster.get_region(b"k3"); + assert_eq!(r3_new.get_id(), r3.get_id()); + let new_states = collect_all_states(&cluster, r3_new.get_id()); + // index 6 empty command + // index 7 CommitMerge + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + let _gap = new.in_memory_apply_state.get_applied_index() + - old.in_memory_apply_state.get_applied_index(); + let gap2 = new.in_disk_apply_state.get_applied_index() + - old.in_disk_apply_state.get_applied_index(); + assert_eq!(gap2, 2); + } + } +} diff --git a/tests/proxy/proxy.rs b/tests/proxy/proxy.rs index d9ac1290016f..709de704319a 100644 --- a/tests/proxy/proxy.rs +++ b/tests/proxy/proxy.rs @@ -2,42 +2,20 @@ use std::{ collections::HashMap, - io::{self, Read, Write}, - ops::{Deref, DerefMut}, - path::Path, - sync::{ - atomic::{AtomicUsize, Ordering}, - mpsc, Arc, Once, RwLock, - }, + sync::{Arc, RwLock}, }; -use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; // use engine_store_ffi::config::{ensure_no_common_unrecognized_keys, ProxyConfig}; -use engine_traits::{ - Error, Iterable, Iterator, MiscExt, Mutable, Peekable, Result, SeekKey, SstExt, SstWriter, - WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, -}; +use engine_traits::{Peekable, CF_RAFT}; use kvproto::{ raft_cmdpb::{AdminCmdType, AdminRequest}, raft_serverpb::{RaftApplyState, RegionLocalState, StoreIdent}, }; use new_mock_engine_store::{ - mock_cluster::FFIHelperSet, - node::NodeCluster, - transport_simulate::{ - CloneFilterFactory, CollectSnapshotFilter, Direction, RegionPacketFilter, - }, - Cluster, ProxyConfig, Simulator, TestPdClient, -}; -use proxy_server::config::{ensure_no_common_unrecognized_keys, validate_and_persist_config}; -use raft::eraftpb::MessageType; -use raftstore::{ - coprocessor::{ConsistencyCheckMethod, Coprocessor}, - store::util::find_peer, + mock_cluster::FFIHelperSet, node::NodeCluster, Cluster, ProxyConfig, TestPdClient, }; -use sst_importer::SstImporter; +use raftstore::coprocessor::ConsistencyCheckMethod; pub use test_raftstore::{must_get_equal, must_get_none, new_peer}; -use tikv::config::TiKvConfig; use tikv_util::{ config::{ReadableDuration, ReadableSize}, time::Duration, @@ -82,6 +60,7 @@ pub fn new_verify_hash_request(hash: Vec, index: u64) -> AdminRequest { req } +#[derive(Debug)] pub struct States { pub in_memory_apply_state: RaftApplyState, pub in_memory_applied_term: u64, @@ -291,9 +270,9 @@ pub fn disable_auto_gen_compact_log(cluster: &mut Cluster) { // Disable AUTO generated compact log. // This will not totally disable, so we use some failpoints later. cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); - cluster.cfg.raft_store.raft_log_gc_threshold = 1000; + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(100000); + cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(500000); + cluster.cfg.raft_store.raft_log_gc_threshold = 10000; } #[test]