Skip to content

Commit

Permalink
tests: speed up tests with batch put and mem disk (tikv#11249)
Browse files Browse the repository at this point in the history
* tests: speed up tests with batch put and mem disk

Ref tikv#5637.

Signed-off-by: Jay Lee <[email protected]>

* detect memory disk

There are many heavy write in raftstore tests, I have seen some write IO
task takes several seconds to finished, which will certainly make most
test cases timeout. This PR utilizes memory disk to speed up those
operations.

Ref tikv#11177

Signed-off-by: Jay Lee <[email protected]>

* also put snapshot in mem

Signed-off-by: Jay Lee <[email protected]>

* wait a little longer

Signed-off-by: Jay Lee <[email protected]>

* fix inspected IO on mem disk

Signed-off-by: Jay Lee <[email protected]>

* fix build

Signed-off-by: Jay Lee <[email protected]>

* fix regression brought by pessimistic pipeline

Signed-off-by: Jay Lee <[email protected]>

* fix regression from pre-transfer-leader

Signed-off-by: Jay Lee <[email protected]>

* introduce test config

Signed-off-by: Jay Lee <[email protected]>

* clean up and wait more on test_merge_cascade_merge_isolated

Signed-off-by: Jay Lee <[email protected]>

* fix panic in encryption

Signed-off-by: Jay Lee <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
BusyJay and ti-chi-bot authored Nov 5, 2021
1 parent 2060fd4 commit cfc1cd9
Show file tree
Hide file tree
Showing 21 changed files with 126 additions and 82 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ fuzz-incremental/
/last_tikv.toml
/raft/
core.*
*.info

# Ignore all logs
*.log
3 changes: 1 addition & 2 deletions components/encryption/src/encrypted_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ impl<'a> EncryptedFile<'a> {
let mut tmp_file = OpenOptions::new()
.create(true)
.write(true)
.open(&tmp_path)
.unwrap_or_else(|e| panic!("EncryptedFile::write {:?}: {}", &tmp_path.to_str(), e));
.open(&tmp_path)?;

// Encrypt the content.
let encrypted_content = master_key
Expand Down
14 changes: 6 additions & 8 deletions components/encryption/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,14 +691,12 @@ impl DataKeyManager {

impl Drop for DataKeyManager {
fn drop(&mut self) {
self.rotate_terminal
.send(())
.expect("DataKeyManager drop send");
self.background_worker
.take()
.expect("DataKeyManager worker take")
.join()
.expect("DataKeyManager worker join");
if let Err(e) = self.rotate_terminal.send(()) {
info!("failed to terminate background rotation, are we shutting down?"; "err" => %e);
}
if let Some(Err(e)) = self.background_worker.take().map(|w| w.join()) {
info!("failed to join background rotation, are we shutting down?"; "err" => ?e);
}
}
}

Expand Down
8 changes: 5 additions & 3 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ pub struct Config {
pub raft_engine_purge_interval: ReadableDuration,
// When a peer is not responding for this time, leader will not keep entry cache for it.
pub raft_entry_cache_life_time: ReadableDuration,
// Deprecated! The configuration has no effect.
// They are preserved for compatibility check.
// When a peer is newly added, reject transferring leader to the peer for a while.
#[doc(hidden)]
#[serde(skip_serializing)]
#[online_config(skip)]
pub raft_reject_transfer_leader_duration: ReadableDuration,

// Interval (ms) to check region whether need to be split or not.
Expand Down Expand Up @@ -565,9 +570,6 @@ impl Config {
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["raft_entry_cache_life_time"])
.set(self.raft_entry_cache_life_time.as_secs() as f64);
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["raft_reject_transfer_leader_duration"])
.set(self.raft_reject_transfer_leader_duration.as_secs() as f64);

CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["split_region_check_tick_interval"])
Expand Down
8 changes: 5 additions & 3 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1035,9 +1035,11 @@ where
apply_ctx: &mut ApplyContext<EK, W>,
entry: &Entry,
) -> ApplyResult<EK::Snapshot> {
fail_point!("yield_apply_1000", self.region_id() == 1000, |_| {
ApplyResult::Yield
});
fail_point!(
"yield_apply_first_region",
self.region.get_start_key().is_empty() && !self.region.get_end_key().is_empty(),
|_| ApplyResult::Yield
);

let index = entry.get_index();
let term = entry.get_term();
Expand Down
11 changes: 7 additions & 4 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use kvproto::raft_serverpb::{
use raft::eraftpb::ConfChangeType;
use tempfile::TempDir;

use crate::Config;
use collections::{HashMap, HashSet};
use encryption_export::DataKeyManager;
use engine_rocks::raw::DB;
Expand All @@ -32,7 +33,6 @@ use raftstore::store::fsm::{create_raft_batch_system, RaftBatchSystem, RaftRoute
use raftstore::store::transport::CasualRouter;
use raftstore::store::*;
use raftstore::{Error, Result};
use tikv::config::TiKvConfig;
use tikv::server::Result as ServerResult;
use tikv_util::thread_group::GroupProperties;
use tikv_util::time::Instant;
Expand All @@ -55,7 +55,7 @@ pub trait Simulator {
fn run_node(
&mut self,
node_id: u64,
cfg: TiKvConfig,
cfg: Config,
engines: Engines<RocksEngine, RocksEngine>,
store_meta: Arc<Mutex<StoreMeta>>,
key_manager: Option<Arc<DataKeyManager>>,
Expand Down Expand Up @@ -136,7 +136,7 @@ pub trait Simulator {
}

pub struct Cluster<T: Simulator> {
pub cfg: TiKvConfig,
pub cfg: Config,
leaders: HashMap<u64, metapb::Peer>,
pub count: usize,

Expand Down Expand Up @@ -164,7 +164,10 @@ impl<T: Simulator> Cluster<T> {
) -> Cluster<T> {
// TODO: In the future, maybe it's better to test both case where `use_delete_range` is true and false
Cluster {
cfg: new_tikv_config(id),
cfg: Config {
tikv: new_tikv_config(id),
prefer_mem: true,
},
leaders: HashMap::default(),
count,
paths: vec![],
Expand Down
1 change: 0 additions & 1 deletion components/test_raftstore/src/common-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ raft-store-max-leader-lease = "240ms"
allow-remove-leader = true
merge-check-tick-interval = "100ms"
pd-heartbeat-tick-interval = "20ms"
raft-reject-transfer-leader-duration = "0s"
dev-assert = true
hibernate-regions = true

Expand Down
25 changes: 25 additions & 0 deletions components/test_raftstore/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use std::ops::{Deref, DerefMut};
use tikv::config::TiKvConfig;

#[derive(Clone)]
pub struct Config {
pub tikv: TiKvConfig,
pub prefer_mem: bool,
}

impl Deref for Config {
type Target = TiKvConfig;
#[inline]
fn deref(&self) -> &TiKvConfig {
&self.tikv
}
}

impl DerefMut for Config {
#[inline]
fn deref_mut(&mut self) -> &mut TiKvConfig {
&mut self.tikv
}
}
2 changes: 2 additions & 0 deletions components/test_raftstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern crate tikv_util;
extern crate pd_client;

mod cluster;
mod config;
mod node;
mod pd;
mod router;
Expand All @@ -15,6 +16,7 @@ mod transport_simulate;
mod util;

pub use crate::cluster::*;
pub use crate::config::Config;
pub use crate::node::*;
pub use crate::pd::*;
pub use crate::router::*;
Expand Down
13 changes: 7 additions & 6 deletions components/test_raftstore/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::path::Path;
use std::sync::{Arc, Mutex, RwLock};

use tempfile::{Builder, TempDir};
use tempfile::TempDir;

use kvproto::metapb;
use kvproto::raft_cmdpb::*;
Expand All @@ -12,6 +12,7 @@ use raft::eraftpb::MessageType;
use raft::SnapshotStatus;

use super::*;
use crate::Config;
use collections::{HashMap, HashSet};
use concurrency_manager::ConcurrencyManager;
use encryption_export::DataKeyManager;
Expand All @@ -27,7 +28,7 @@ use raftstore::store::fsm::{RaftBatchSystem, RaftRouter};
use raftstore::store::SnapManagerBuilder;
use raftstore::store::*;
use raftstore::Result;
use tikv::config::{ConfigController, Module, TiKvConfig};
use tikv::config::{ConfigController, Module};
use tikv::import::SSTImporter;
use tikv::server::raftkv::ReplicaReadLockChecker;
use tikv::server::Node;
Expand Down Expand Up @@ -199,7 +200,7 @@ impl Simulator for NodeCluster {
fn run_node(
&mut self,
node_id: u64,
cfg: TiKvConfig,
cfg: Config,
engines: Engines<RocksEngine, RocksEngine>,
store_meta: Arc<Mutex<StoreMeta>>,
key_manager: Option<Arc<DataKeyManager>>,
Expand Down Expand Up @@ -232,7 +233,7 @@ impl Simulator for NodeCluster {
.snap_paths
.contains_key(&node_id)
{
let tmp = Builder::new().prefix("test_cluster").tempdir().unwrap();
let tmp = test_util::temp_dir("test_cluster", cfg.prefer_mem);
let snap_mgr = SnapManagerBuilder::default()
.max_write_bytes_per_sec(cfg.server.snap_max_write_bytes_per_sec.0 as i64)
.max_total_size(cfg.server.snap_max_total_size.0)
Expand Down Expand Up @@ -264,7 +265,7 @@ impl Simulator for NodeCluster {
};

let local_reader = LocalReader::new(engines.kv.clone(), store_meta.clone(), router.clone());
let cfg_controller = ConfigController::new(cfg.clone());
let cfg_controller = ConfigController::new(cfg.tikv.clone());

let split_check_runner =
SplitCheckRunner::new(engines.kv.clone(), router.clone(), coprocessor_host.clone());
Expand All @@ -274,7 +275,7 @@ impl Simulator for NodeCluster {
Box::new(SplitCheckConfigManager(split_scheduler.clone())),
);

let mut raftstore_cfg = cfg.raft_store;
let mut raftstore_cfg = cfg.tikv.raft_store;
raftstore_cfg.validate().unwrap();
let raft_store = Arc::new(VersionTrack::new(raftstore_cfg));
cfg_controller.register(
Expand Down
16 changes: 7 additions & 9 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ use kvproto::metapb;
use kvproto::raft_cmdpb::*;
use kvproto::raft_serverpb;
use kvproto::tikvpb::TikvClient;
use tempfile::{Builder, TempDir};
use tempfile::TempDir;
use tokio::runtime::Builder as TokioBuilder;

use super::*;
use crate::Config;
use collections::{HashMap, HashSet};
use concurrency_manager::ConcurrencyManager;
use encryption_export::DataKeyManager;
Expand Down Expand Up @@ -57,10 +58,7 @@ use tikv::server::{
};
use tikv::storage;
use tikv::storage::txn::flow_controller::FlowController;
use tikv::{
config::{ConfigController, TiKvConfig},
server::raftkv::ReplicaReadLockChecker,
};
use tikv::{config::ConfigController, server::raftkv::ReplicaReadLockChecker};
use tikv_util::config::VersionTrack;
use tikv_util::time::ThreadReadId;
use tikv_util::worker::{Builder as WorkerBuilder, LazyWorker};
Expand Down Expand Up @@ -208,15 +206,15 @@ impl Simulator for ServerCluster {
fn run_node(
&mut self,
node_id: u64,
mut cfg: TiKvConfig,
mut cfg: Config,
engines: Engines<RocksEngine, RocksEngine>,
store_meta: Arc<Mutex<StoreMeta>>,
key_manager: Option<Arc<DataKeyManager>>,
router: RaftRouter<RocksEngine, RocksEngine>,
system: RaftBatchSystem<RocksEngine, RocksEngine>,
) -> ServerResult<u64> {
let (tmp_str, tmp) = if node_id == 0 || !self.snap_paths.contains_key(&node_id) {
let p = Builder::new().prefix("test_cluster").tempdir().unwrap();
let p = test_util::temp_dir("test_cluster", cfg.prefer_mem);
(p.path().to_str().unwrap().to_owned(), Some(p))
} else {
let p = self.snap_paths[&node_id].path().to_str().unwrap();
Expand Down Expand Up @@ -445,12 +443,12 @@ impl Simulator for ServerCluster {
// Register the role change observer of the lock manager.
lock_mgr.register_detector_role_change_observer(&mut coprocessor_host);

let pessimistic_txn_cfg = cfg.pessimistic_txn;
let pessimistic_txn_cfg = cfg.tikv.pessimistic_txn;

let split_check_runner =
SplitCheckRunner::new(engines.kv.clone(), router.clone(), coprocessor_host.clone());
let split_check_scheduler = bg_worker.start("split-check", split_check_runner);
let split_config_manager = SplitConfigManager(Arc::new(VersionTrack::new(cfg.split)));
let split_config_manager = SplitConfigManager(Arc::new(VersionTrack::new(cfg.tikv.split)));
let auto_split_controller = AutoSplitController::new(split_config_manager);
node.start(
engines,
Expand Down
11 changes: 4 additions & 7 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

use crate::Config;
use encryption_export::{
data_key_manager_from_config, DataKeyManager, FileConfig, MasterKeyConfig,
};
Expand Down Expand Up @@ -36,7 +37,7 @@ use raftstore::store::fsm::RaftRouter;
use raftstore::store::*;
use raftstore::Result;
use rand::RngCore;
use tempfile::{Builder, TempDir};
use tempfile::TempDir;
use tikv::config::*;
use tikv::storage::point_key_range;
use tikv_util::config::*;
Expand Down Expand Up @@ -606,13 +607,13 @@ pub fn create_test_engine(
// TODO: pass it in for all cases.
router: Option<RaftRouter<RocksEngine, RocksEngine>>,
limiter: Option<Arc<IORateLimiter>>,
cfg: &TiKvConfig,
cfg: &Config,
) -> (
Engines<RocksEngine, RocksEngine>,
Option<Arc<DataKeyManager>>,
TempDir,
) {
let dir = Builder::new().prefix("test_cluster").tempdir().unwrap();
let dir = test_util::temp_dir("test_cluster", cfg.prefer_mem);
let key_manager =
data_key_manager_from_config(&cfg.security.encryption, dir.path().to_str().unwrap())
.unwrap()
Expand Down Expand Up @@ -709,10 +710,6 @@ pub fn ignore_merge_target_integrity<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.pd_client.ignore_merge_target_integrity();
}

pub fn configure_for_transfer_leader<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.cfg.raft_store.raft_reject_transfer_leader_duration = ReadableDuration::secs(1);
}

pub fn configure_for_lease_read<T: Simulator>(
cluster: &mut Cluster<T>,
base_tick_ms: Option<u64>,
Expand Down
20 changes: 20 additions & 0 deletions components/test_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,23 @@ pub fn alloc_port() -> u16 {
}
}
}

static MEM_DISK: &str = "TIKV_TEST_MEMORY_DISK_MOUNT_POINT";

/// Gets a temporary path. The directory will be removed when dropped.
///
/// The returned path will point to memory only when memory disk is available
/// and specified.
pub fn temp_dir(prefix: impl Into<Option<&'static str>>, prefer_mem: bool) -> tempfile::TempDir {
let mut builder = tempfile::Builder::new();
if let Some(prefix) = prefix.into() {
builder.prefix(prefix);
}
match env::var(MEM_DISK) {
Ok(dir) if prefer_mem => {
debug!("using memory disk"; "path" => %dir);
builder.tempdir_in(dir).unwrap()
}
_ => builder.tempdir().unwrap(),
}
}
4 changes: 2 additions & 2 deletions tests/failpoints/cases/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,8 +778,8 @@ fn test_node_merge_cascade_merge_with_apply_yield() {

pd_client.must_merge(r2.get_id(), r1.get_id());
assert_eq!(r1.get_id(), 1000);
let yield_apply_1000_fp = "yield_apply_1000";
fail::cfg(yield_apply_1000_fp, "80%3*return()").unwrap();
let yield_apply_first_region_fp = "yield_apply_first_region";
fail::cfg(yield_apply_first_region_fp, "80%3*return()").unwrap();

for i in 0..10 {
cluster.must_put(format!("k{}", i).as_bytes(), b"v2");
Expand Down
1 change: 0 additions & 1 deletion tests/integrations/config/test-custom.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ raft-log-gc-size-limit = "1KB"
raft-log-reserve-max-ticks = 100
raft-engine-purge-interval = "20m"
raft-entry-cache-life-time = "12s"
raft-reject-transfer-leader-duration = "3s"
split-region-check-tick-interval = "12s"
region-split-check-diff = "20MB"
region-compact-check-interval = "12s"
Expand Down
5 changes: 4 additions & 1 deletion tests/integrations/import/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ const CLEANUP_SST_MILLIS: u64 = 10;
pub fn new_cluster(cfg: TiKvConfig) -> (Cluster<ServerCluster>, Context) {
let count = 1;
let mut cluster = new_server_cluster(0, count);
cluster.cfg = cfg;
cluster.cfg = Config {
tikv: cfg,
prefer_mem: true,
};
cluster.run();

let region_id = 1;
Expand Down
Loading

0 comments on commit cfc1cd9

Please sign in to comment.